You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 11:01:49 UTC
[pulsar] branch master updated: [monitoring][broker][metadata] add metadata store metrics (#17041)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 37f0a2da9b0 [monitoring][broker][metadata] add metadata store metrics (#17041)
37f0a2da9b0 is described below
commit 37f0a2da9b0749313c6dbadd779377a51a34f0de
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Sep 20 19:01:41 2022 +0800
[monitoring][broker][metadata] add metadata store metrics (#17041)
---
.../rackawareness/BookieRackAffinityMapping.java | 1 +
.../pulsar/broker/resources/PulsarResources.java | 12 +-
.../apache/pulsar/PulsarClusterMetadataSetup.java | 19 ++-
.../pulsar/PulsarClusterMetadataTeardown.java | 8 +-
.../apache/pulsar/PulsarInitialNamespaceSetup.java | 2 +-
.../PulsarTransactionCoordinatorMetadataSetup.java | 2 +-
.../org/apache/pulsar/broker/PulsarService.java | 2 +
.../apache/pulsar/compaction/CompactorTool.java | 1 +
.../broker/auth/MockedPulsarServiceBaseTest.java | 14 ++-
.../broker/stats/MetadataStoreStatsTest.java | 140 +++++++++++++++++++++
.../broker/zookeeper/ClusterMetadataSetupTest.java | 4 +-
.../websocket/proxy/ProxyAuthenticationTest.java | 2 +-
.../websocket/proxy/ProxyAuthorizationTest.java | 2 +-
.../websocket/proxy/ProxyConfigurationTest.java | 2 +-
.../proxy/ProxyEncryptionPublishConsumeTest.java | 2 +-
.../websocket/proxy/ProxyPublishConsumeTest.java | 2 +-
.../proxy/ProxyPublishConsumeTlsTest.java | 2 +-
.../proxy/ProxyPublishConsumeWithoutZKTest.java | 2 +-
.../proxy/v1/V1_ProxyAuthenticationTest.java | 2 +-
.../PulsarMetadataStateStoreProviderImpl.java | 3 +-
.../org/apache/pulsar/functions/worker/Worker.java | 2 +-
pulsar-metadata/pom.xml | 5 +
.../pulsar/metadata/api/MetadataStoreConfig.java | 9 ++
.../bookkeeper/AbstractMetadataDriver.java | 1 +
.../pulsar/metadata/bookkeeper/BKCluster.java | 3 +-
.../metadata/impl/AbstractMetadataStore.java | 71 +++++++++--
.../metadata/impl/LocalMemoryMetadataStore.java | 1 +
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 1 +
.../batching/AbstractBatchedMetadataStore.java | 2 +-
.../metadata/impl/stats/MetadataStoreStats.java | 110 ++++++++++++++++
.../pulsar/metadata/impl/stats/package-info.java | 19 +++
.../apache/pulsar/proxy/server/ProxyService.java | 4 +-
.../pulsar/sql/presto/PulsarConnectorCache.java | 2 +-
.../pulsar/testclient/ManagedLedgerWriter.java | 2 +-
.../apache/pulsar/websocket/WebSocketService.java | 6 +-
35 files changed, 417 insertions(+), 45 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index c0c29637114..dab42558d01 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -98,6 +98,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder()
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.sessionTimeoutMillis(zkTimeout)
.build());
} catch (MetadataStoreException e) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a087d8090d3..9cfeddd123d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -88,9 +88,17 @@ public class PulsarResources {
this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
}
- public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+ public static MetadataStoreExtended createLocalMetadataStore(String serverUrls, int sessionTimeoutMs)
throws MetadataStoreException {
return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
- .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
+ .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
+ }
+
+ public static MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs)
+ throws MetadataStoreException {
+ return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
+ .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index a06378d20ff..a95e6121b3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -248,8 +248,8 @@ public class PulsarClusterMetadataSetup {
arguments.metadataStoreUrl, arguments.configurationMetadataStore);
MetadataStoreExtended localStore =
- initMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
- MetadataStoreExtended configStore = initMetadataStore(arguments.configurationMetadataStore,
+ initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
+ MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore,
arguments.zkSessionTimeoutMillis);
final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl
@@ -389,9 +389,22 @@ public class PulsarClusterMetadataSetup {
}
}
- public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
+ public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+ .build());
+ if (store instanceof MetadataStoreLifecycle) {
+ ((MetadataStoreLifecycle) store).initializeCluster().get();
+ }
+ return store;
+ }
+
+ public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout)
+ throws Exception {
+ MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(sessionTimeout)
+ .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.build());
if (store instanceof MetadataStoreLifecycle) {
((MetadataStoreLifecycle) store).initializeCluster().get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index d0da6a5a2bd..d0ba5c47f75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -99,7 +99,10 @@ public class PulsarClusterMetadataTeardown {
@Cleanup
MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper,
- MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+ .build());
if (arguments.bkMetadataServiceUri != null) {
@Cleanup
@@ -121,7 +124,8 @@ public class PulsarClusterMetadataTeardown {
// Should it be done by REST API before broker is down?
@Cleanup
MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore,
- MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
+ MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
+ .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java
index 15e4792fd97..bb27be60b35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java
@@ -83,7 +83,7 @@ public class PulsarInitialNamespaceSetup {
}
try (MetadataStore configStore = PulsarClusterMetadataSetup
- .initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
+ .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
for (String namespace : arguments.namespaces) {
NamespaceName namespaceName = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
index 66607dd4c0a..b1fae8753ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
@@ -91,7 +91,7 @@ public class PulsarTransactionCoordinatorMetadataSetup {
}
try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup
- .initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
+ .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
// Create system tenant
PulsarClusterMetadataSetup
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 750a56e0c86..c69c6e2418f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -362,6 +362,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
+ .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.synchronizer(synchronizer)
.build());
}
@@ -1045,6 +1046,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.synchronizer(synchronizer)
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index a54135de9a8..4cb1fd347df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -143,6 +143,7 @@ public class CompactorTool {
MetadataStoreExtended store = MetadataStoreExtended.create(brokerConfig.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int) brokerConfig.getMetadataStoreSessionTimeoutMillis())
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());
@Cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ba1b4885d76..f70b3b43ab9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -381,21 +381,27 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
protected MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
- return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+ return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+ .synchronizer(synchronizer).build());
}
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
- return new ZKMetadataStore(mockZooKeeper);
+ return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
}
protected MetadataStoreExtended createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
return new ZKMetadataStore(mockZooKeeperGlobal,
- MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+ MetadataStoreConfig.builder()
+ .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+ .synchronizer(synchronizer).build());
}
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
- return new ZKMetadataStore(mockZooKeeperGlobal);
+ return new ZKMetadataStore(mockZooKeeperGlobal, MetadataStoreConfig.builder()
+ .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
}
private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
new file mode 100644
index 00000000000..eba134a2c8d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import com.google.common.collect.Multimap;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "broker")
+public class MetadataStoreStatsTest extends BrokerTestBase {
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ conf.setTopicLevelPoliciesEnabled(false);
+ conf.setSystemTopicEnabled(false);
+ super.baseSetup();
+ AuthenticationProviderToken.resetMetrics();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ resetConfig();
+ }
+
+ @Test
+ public void testMetadataStoreStats() throws Exception {
+ String ns = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns);
+
+ String topic = "persistent://prop/ns-abc1/metadata-store-" + UUID.randomUUID();
+ String subName = "my-sub1";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName(subName).subscribe();
+
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().value(UUID.randomUUID().toString()).send();
+ }
+
+ for (;;) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ consumer.acknowledge(message);
+ }
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
+ String metricsStr = output.toString();
+ Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+ Collection<PrometheusMetricsTest.Metric> opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
+ Collection<PrometheusMetricsTest.Metric> putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
+
+ Assert.assertTrue(opsLatency.size() > 1);
+ Assert.assertTrue(putBytes.size() > 1);
+
+ for (PrometheusMetricsTest.Metric m : opsLatency) {
+ Assert.assertEquals(m.tags.get("cluster"), "test");
+ String metadataStoreName = m.tags.get("name");
+ Assert.assertNotNull(metadataStoreName);
+ Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+ || metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+ || metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+ Assert.assertNotNull(m.tags.get("status"));
+
+ if (m.tags.get("status").equals("success")) {
+ if (m.tags.get("type").equals("get")) {
+ Assert.assertTrue(m.value >= 0);
+ } else if (m.tags.get("type").equals("del")) {
+ Assert.assertTrue(m.value >= 0);
+ } else if (m.tags.get("type").equals("put")) {
+ Assert.assertTrue(m.value >= 0);
+ } else {
+ Assert.fail();
+ }
+ } else {
+ if (m.tags.get("type").equals("get")) {
+ Assert.assertTrue(m.value >= 0);
+ } else if (m.tags.get("type").equals("del")) {
+ Assert.assertTrue(m.value >= 0);
+ } else if (m.tags.get("type").equals("put")) {
+ Assert.assertTrue(m.value >= 0);
+ } else {
+ Assert.fail();
+ }
+ }
+ }
+ for (PrometheusMetricsTest.Metric m : putBytes) {
+ Assert.assertEquals(m.tags.get("cluster"), "test");
+ String metadataStoreName = m.tags.get("name");
+ Assert.assertNotNull(metadataStoreName);
+ Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+ || metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+ || metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+ Assert.assertTrue(m.value > 0);
+ }
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
index 84a42aace85..75b385dba4d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
@@ -251,7 +251,7 @@ public class ClusterMetadataSetupTest {
PulsarClusterMetadataSetup.main(args);
try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup
- .initMetadataStore(zkConnection, 30000)) {
+ .initLocalMetadataStore(zkConnection, 30000)) {
// expected not exist
assertFalse(localStore.exists("/ledgers").get());
@@ -268,7 +268,7 @@ public class ClusterMetadataSetupTest {
PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs);
try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup
- .initMetadataStore(zkConnection, 30000)) {
+ .initLocalMetadataStore(zkConnection, 30000)) {
// expected not exist
assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index a34ec879ba6..35a7aa44391 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -83,7 +83,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index 7e0ee1bd466..982f4cd48eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -64,7 +64,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
config.setWebServicePort(Optional.of(0));
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
service.start();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 26cf8a0e154..e6eba5e32cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -67,7 +67,7 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
config.setServiceUrl("http://localhost:8080");
config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
service.start();
PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
index bbd2b3bd14f..87741e5bede 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
@@ -79,7 +79,7 @@ public class ProxyEncryptionPublishConsumeTest extends ProducerConsumerBase {
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
WebSocketService service = spy(new WebSocketService(config));
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 918640642ec..951a3db4f02 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -101,7 +101,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
config.setClusterName("test");
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index a8b67416107..5e40a7535a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -74,7 +74,7 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 1fb12645e5e..c4f4e876c69 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -62,7 +62,7 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
config.setServiceUrl(pulsar.getSafeWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index b80c3fb07be..09a7d3a90fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -85,7 +85,7 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
index 819bfd94cb7..0fcc4c56b40 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
@@ -47,7 +47,8 @@ public class PulsarMetadataStateStoreProviderImpl implements StateStoreProvider
shouldCloseStore = false;
} else {
String metadataUrl = (String) config.get(METADATA_URL);
- store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
+ store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder()
+ .metadataStoreName(MetadataStoreConfig.STATE_METADATA_STORE).build());
shouldCloseStore = true;
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 0ff6dbc0431..d4d4873737d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -74,7 +74,7 @@ public class Worker {
log.info("starting configuration cache service");
try {
- configMetadataStore = PulsarResources.createMetadataStore(
+ configMetadataStore = PulsarResources.createConfigMetadataStore(
workerConfig.getConfigurationMetadataStoreUrl(),
(int) workerConfig.getMetadataStoreSessionTimeoutMillis());
} catch (IOException e) {
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index b34271f7f32..7f8c3e93754 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -99,6 +99,11 @@
<artifactId>caffeine</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index e1d0ede7d2a..f00cc431480 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -29,6 +29,9 @@ import lombok.ToString;
@Getter
@ToString
public class MetadataStoreConfig {
+ public static final String METADATA_STORE = "metadata-store";
+ public static final String STATE_METADATA_STORE = "state-metadata-store";
+ public static final String CONFIGURATION_METADATA_STORE = "configuration-metadata-store";
/**
* The (implementation specific) session timeout, in milliseconds.
@@ -72,6 +75,12 @@ public class MetadataStoreConfig {
@Builder.Default
private final int batchingMaxSizeKb = 128;
+ /**
+ * The name of a metadata store.
+ */
+ @Builder.Default
+ private final String metadataStoreName = "";
+
/**
* Pluggable MetadataEventSynchronizer to sync metadata events across the
* separate clusters.
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
index 76a14300d0b..8a9984d626d 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
@@ -115,6 +115,7 @@ public abstract class AbstractMetadataDriver implements Closeable {
this.store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder()
.sessionTimeoutMillis(conf.getZkTimeout())
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());
this.storeInstanceIsOwned = true;
} catch (MetadataStoreException e) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 36b0be112e2..24fa4730854 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -119,7 +119,8 @@ public class BKCluster implements AutoCloseable {
this.baseClientConf = newBaseClientConfiguration();
this.store =
- MetadataStoreExtended.create(clusterConf.metadataServiceUri, MetadataStoreConfig.builder().build());
+ MetadataStoreExtended.create(clusterConf.metadataServiceUri, MetadataStoreConfig.builder()
+ .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
baseConf.setJournalRemovePagesFromCache(false);
baseConf.setProperty(AbstractMetadataDriver.METADATA_STORE_INSTANCE, store);
baseClientConf.setProperty(AbstractMetadataDriver.METADATA_STORE_INSTANCE, store);
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f7e90dc8e60..96c97b31e51 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -35,9 +35,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -59,18 +59,20 @@ import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
@Slf4j
public abstract class AbstractMetadataStore implements MetadataStoreExtended, Consumer<Notification> {
-
private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
+ protected final String metadataStoreName;
protected final ScheduledExecutorService executor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
+ private final MetadataStoreStats metadataStoreStats;
// We don't strictly need to use 'volatile' here because we don't need the precise consistent semantic. Instead,
// we want to avoid the overhead of 'volatile'.
@@ -81,9 +83,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
protected abstract CompletableFuture<Boolean> existsFromStore(String path);
- protected AbstractMetadataStore() {
- this.executor = Executors
- .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store"));
+ protected AbstractMetadataStore(String metadataStoreName) {
+ this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(metadataStoreName));
registerListener(this);
this.childrenCache = Caffeine.newBuilder()
@@ -127,6 +128,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
}
}
});
+
+ this.metadataStoreName = metadataStoreName;
+ this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);
}
protected void registerSyncLister(Optional<MetadataEventSynchronizer> synchronizer) {
@@ -236,10 +240,20 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
+ long start = System.currentTimeMillis();
if (!isValidPath(path)) {
- return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
+ metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
+ return FutureUtil
+ .failedFuture(new MetadataStoreException.InvalidPathException(path));
}
- return storeGet(path);
+ return storeGet(path)
+ .whenComplete((v, t) -> {
+ if (t != null) {
+ metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
+ } else {
+ metadataStoreStats.recordGetOpsSucceeded(System.currentTimeMillis() - start);
+ }
+ });
}
protected abstract CompletableFuture<Optional<GetResult>> storeGet(String path);
@@ -314,7 +328,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
@Override
public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
+ long start = System.currentTimeMillis();
if (!isValidPath(path)) {
+ metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
if (getMetadataEventSynchronizer().isPresent()) {
@@ -322,9 +338,23 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
expectedVersion.orElse(null), Instant.now().toEpochMilli(),
getMetadataEventSynchronizer().get().getClusterName(), NotificationType.Deleted);
return getMetadataEventSynchronizer().get().notify(event)
- .thenCompose(__ -> deleteInternal(path, expectedVersion));
+ .thenCompose(__ -> deleteInternal(path, expectedVersion))
+ .whenComplete((v, t) -> {
+ if (null != t) {
+ metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
+ } else {
+ metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start);
+ }
+ });
} else {
- return deleteInternal(path, expectedVersion);
+ return deleteInternal(path, expectedVersion)
+ .whenComplete((v, t) -> {
+ if (null != t) {
+ metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
+ } else {
+ metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start);
+ }
+ });
}
}
@@ -364,7 +394,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
@Override
public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
+ long start = System.currentTimeMillis();
if (!isValidPath(path)) {
+ metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
HashSet<CreateOption> ops = new HashSet<>(options);
@@ -375,9 +407,25 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
Instant.now().toEpochMilli(), getMetadataEventSynchronizer().get().getClusterName(),
NotificationType.Modified);
return getMetadataEventSynchronizer().get().notify(event)
- .thenCompose(__ -> putInternal(path, data, optExpectedVersion, options));
+ .thenCompose(__ -> putInternal(path, data, optExpectedVersion, options))
+ .whenComplete((v, t) -> {
+ if (t != null) {
+ metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
+ } else {
+ int len = data == null ? 0 : data.length;
+ metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len);
+ }
+ });
} else {
- return putInternal(path, data, optExpectedVersion, options);
+ return putInternal(path, data, optExpectedVersion, options)
+ .whenComplete((v, t) -> {
+ if (t != null) {
+ metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
+ } else {
+ int len = data == null ? 0 : data.length;
+ metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len);
+ }
+ });
}
}
@@ -428,6 +476,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
public void close() throws Exception {
executor.shutdownNow();
executor.awaitTermination(10, TimeUnit.SECONDS);
+ this.metadataStoreStats.close();
}
@VisibleForTesting
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 924a6ac5d6d..b520745a406 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -75,6 +75,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
+ super(metadataStoreConfig.getMetadataStoreName());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 69af0e76914..b276d3cb7a8 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -216,6 +216,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
*/
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
+ super(metadataStoreConfig.getMetadataStoreName());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index c9d245b8caf..de7088d74af 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -53,7 +53,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore
private MetadataEventSynchronizer synchronizer;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
- super();
+ super(conf.getMetadataStoreName());
this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
new file mode 100644
index 00000000000..2351ba1a591
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class MetadataStoreStats implements AutoCloseable {
+ private static final double[] BUCKETS = new double[]{1, 3, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000};
+ private static final String OPS_TYPE_LABEL_NAME = "type";
+ private static final String METADATA_STORE_LABEL_NAME = "name";
+ private static final String STATUS = "status";
+
+ private static final String OPS_TYPE_GET = "get";
+ private static final String OPS_TYPE_DEL = "del";
+ private static final String OPS_TYPE_PUT = "put";
+ private static final String STATUS_SUCCESS = "success";
+ private static final String STATUS_FAIL = "fail";
+
+ protected static final String PREFIX = "pulsar_metadata_store_";
+
+ private static final Histogram OPS_LATENCY = Histogram
+ .build(PREFIX + "ops_latency", "-")
+ .unit("ms")
+ .buckets(BUCKETS)
+ .labelNames(METADATA_STORE_LABEL_NAME, OPS_TYPE_LABEL_NAME, STATUS)
+ .register();
+ private static final Counter PUT_BYTES = Counter
+ .build(PREFIX + "put", "-")
+ .unit("bytes")
+ .labelNames(METADATA_STORE_LABEL_NAME)
+ .register();
+
+ private final Histogram.Child getOpsSucceedChild;
+ private final Histogram.Child delOpsSucceedChild;
+ private final Histogram.Child putOpsSucceedChild;
+ private final Histogram.Child getOpsFailedChild;
+ private final Histogram.Child delOpsFailedChild;
+ private final Histogram.Child putOpsFailedChild;
+ private final Counter.Child putBytesChild;
+ private final String metadataStoreName;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public MetadataStoreStats(String metadataStoreName) {
+ this.metadataStoreName = metadataStoreName;
+
+ this.getOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_GET, STATUS_SUCCESS);
+ this.delOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_DEL, STATUS_SUCCESS);
+ this.putOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_PUT, STATUS_SUCCESS);
+ this.getOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_GET, STATUS_FAIL);
+ this.delOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_DEL, STATUS_FAIL);
+ this.putOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_PUT, STATUS_FAIL);
+ this.putBytesChild = PUT_BYTES.labels(metadataStoreName);
+ }
+
+ public void recordGetOpsSucceeded(long millis) {
+ this.getOpsSucceedChild.observe(millis);
+ }
+
+ public void recordDelOpsSucceeded(long millis) {
+ this.delOpsSucceedChild.observe(millis);
+ }
+
+ public void recordPutOpsSucceeded(long millis, int bytes) {
+ this.putOpsSucceedChild.observe(millis);
+ this.putBytesChild.inc(bytes);
+ }
+
+ public void recordGetOpsFailed(long millis) {
+ this.getOpsFailedChild.observe(millis);
+ }
+
+ public void recordDelOpsFailed(long millis) {
+ this.delOpsFailedChild.observe(millis);
+ }
+
+ public void recordPutOpsFailed(long millis) {
+ this.putOpsFailedChild.observe(millis);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.closed.compareAndSet(false, true)) {
+ OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_GET, STATUS_SUCCESS);
+ OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_DEL, STATUS_SUCCESS);
+ OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_PUT, STATUS_SUCCESS);
+ OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_GET, STATUS_FAIL);
+ OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_DEL, STATUS_FAIL);
+ OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_PUT, STATUS_FAIL);
+ PUT_BYTES.remove(this.metadataStoreName);
+ }
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java
new file mode 100644
index 00000000000..15ca0d1c582
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 8b8b474e5e3..cc1e4cbc880 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -447,12 +447,12 @@ public class ProxyService implements Closeable {
}
public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
- return PulsarResources.createMetadataStore(proxyConfig.getMetadataStoreUrl(),
+ return PulsarResources.createLocalMetadataStore(proxyConfig.getMetadataStoreUrl(),
proxyConfig.getMetadataStoreSessionTimeoutMillis());
}
public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
- return PulsarResources.createMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
+ return PulsarResources.createConfigMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
proxyConfig.getMetadataStoreSessionTimeoutMillis());
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index f6a4771f9e4..852ed7f18e8 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -74,7 +74,7 @@ public class PulsarConnectorCache {
private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
StatsProvider.class, getClass().getClassLoader());
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 73987c8b4a8..ab30e7dd4ec 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -193,7 +193,7 @@ public class ManagedLedgerWriter {
@Cleanup
MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.metadataStoreUrl,
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkConf, mlFactoryConf);
ManagedLedgerConfig mlConf = new ManagedLedgerConfig();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index f8b63e0641f..7e798eecfdd 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -106,7 +106,7 @@ public class WebSocketService implements Closeable {
if (isNotBlank(config.getConfigurationMetadataStoreUrl())) {
try {
- configMetadataStore = createMetadataStore(config.getConfigurationMetadataStoreUrl(),
+ configMetadataStore = createConfigMetadataStore(config.getConfigurationMetadataStoreUrl(),
(int) config.getMetadataStoreSessionTimeoutMillis());
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
@@ -140,9 +140,9 @@ public class WebSocketService implements Closeable {
log.info("Pulsar WebSocket Service started");
}
- public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+ public MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs)
throws MetadataStoreException {
- return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs);
+ return PulsarResources.createConfigMetadataStore(serverUrls, sessionTimeoutMs);
}
@Override