You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/08 00:53:30 UTC
[pulsar] branch master updated: Expose managed ledger bookie client
metric to prometheus (#6814)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b983abf Expose managed ledger bookie client metric to prometheus (#6814)
b983abf is described below
commit b983abf6e2648334bf5af21f4aa2227baf4b8b37
Author: hangc0276 <ha...@163.com>
AuthorDate: Mon Jun 8 08:53:18 2020 +0800
Expose managed ledger bookie client metric to prometheus (#6814)
### Motivation
Pulsar use bookkeeper as distributed log storage, and init a bookie client to read/write data from/to bookkeeper. However the pulsar bookie client use default ` NullStatsLogger.INSTANCE` to expose runtime metric, which doesn't expose to prometheus or other state storage. When tuning pulsar bookie client performance, we doesn't have any bookie metric to measure where is the bottleneck.
### Changes
I implement a prometheus state provider, and use it to trace bookie client runtime metric, and expose it to prometheus.
---
conf/broker.conf | 9 +
conf/standalone.conf | 9 +
.../bookkeeper/mledger/ManagedLedgerFactory.java | 1 +
.../mledger/ManagedLedgerFactoryConfig.java | 15 ++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 30 +++-
.../apache/pulsar/broker/ServiceConfiguration.java | 19 ++
.../pulsar/broker/BookKeeperClientFactory.java | 5 +
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 13 +-
.../pulsar/broker/ManagedLedgerClientFactory.java | 28 ++-
.../prometheus/PrometheusMetricsGenerator.java | 21 +++
.../metrics/DataSketchesOpStatsLogger.java | 199 +++++++++++++++++++++
.../metrics/PrometheusMetricsProvider.java | 129 +++++++++++++
.../prometheus/metrics/PrometheusStatsLogger.java | 76 ++++++++
.../metrics/PrometheusTextFormatUtil.java | 156 ++++++++++++++++
.../prometheus/metrics/SimpleGauge.java} | 28 ++-
.../broker/MockedBookKeeperClientFactory.java | 8 +
.../broker/auth/MockedPulsarServiceBaseTest.java | 11 ++
.../pulsar/broker/stats/PrometheusMetricsTest.java | 44 +++++
site2/docs/reference-metrics.md | 21 ++-
19 files changed, 797 insertions(+), 25 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 523f7ad..05c2747 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -691,6 +691,9 @@ bookkeeperDiskWeightBasedPlacementEnabled=false
# A value of '0' disables sending any explicit LACs. Default is 0.
bookkeeperExplicitLacIntervalInMills=0
+# Expose bookkeeper client managed ledger stats to prometheus. default is false
+# bookkeeperClientExposeStatsToPrometheus=false
+
### --- Managed Ledger --- ###
# Number of bookies to use when creating a ledger
@@ -793,6 +796,12 @@ managedLedgerReadEntryTimeoutSeconds=0
# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0
+# Managed ledger prometheus stats latency rollover seconds (default: 60s)
+managedLedgerPrometheusStatsLatencyRolloverSeconds=60
+
+# Whether trace managed ledger task execution time
+managedLedgerTraceTaskExecution=true
+
# New entries check delay for the cursor under the managed ledger.
# If no new messages in the topic, the cursor will try to check again after the delay time.
# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 79b520b..2efc651 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -473,6 +473,9 @@ bookkeeperDiskWeightBasedPlacementEnabled=false
# A value of '0' disables sending any explicit LACs. Default is 0.
bookkeeperExplicitLacIntervalInMills=0
+# Expose bookkeeper client managed ledger stats to prometheus. default is false
+# bookkeeperClientExposeStatsToPrometheus=false
+
### --- Managed Ledger --- ###
# Number of bookies to use when creating a ledger
@@ -576,6 +579,12 @@ managedLedgerNewEntriesCheckDelayInMillis=10
# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true
+# Managed ledger prometheus stats latency rollover seconds (default: 60s)
+managedLedgerPrometheusStatsLatencyRolloverSeconds=60
+
+# Whether trace managed ledger task execution time
+managedLedgerTraceTaskExecution=true
+
### --- Load balancer --- ###
loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 8e94e95..7e1f086 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
+import org.apache.bookkeeper.stats.StatsProvider;
/**
* A factory to open/create managed ledgers and delete them.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 4d1eb31..e42befc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -51,4 +51,19 @@ public class ManagedLedgerFactoryConfig {
* Whether we should make a copy of the entry payloads when inserting in cache
*/
private boolean copyEntriesInCache = false;
+
+ /**
+ * Whether trace managed ledger task execution time
+ */
+ private boolean traceTaskExecution = true;
+
+ /**
+ * Managed ledger prometheus stats Latency Rollover Seconds
+ */
+ private int prometheusStatsLatencyRolloverSeconds = 60;
+
+ /**
+ * cluster name for prometheus stats
+ */
+ private String clusterName;
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 36bae4c..517c62c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -76,6 +76,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -124,7 +126,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config) throws Exception {
- this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc, config);
+ this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */,
+ zkc, config, NullStatsLogger.INSTANCE);
}
private ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, String zkConnection, ManagedLedgerFactoryConfig config) throws Exception {
@@ -132,7 +135,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
true,
ZooKeeperClient.newBuilder()
.connectString(zkConnection)
- .sessionTimeoutMs(clientConfiguration.getZkTimeout()).build(), config);
+ .sessionTimeoutMs(clientConfiguration.getZkTimeout()).build(), config, NullStatsLogger.INSTANCE);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
@@ -141,22 +144,35 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
throws Exception {
- this((policyConfig) -> bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
+ this((policyConfig) -> bookKeeper, false /* isBookkeeperManaged */,
+ zooKeeper, config, NullStatsLogger.INSTANCE);
}
- public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
+ public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+ ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
throws Exception {
- this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config);
+ this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config, NullStatsLogger.INSTANCE);
}
- private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, boolean isBookkeeperManaged, ZooKeeper zooKeeper,
- ManagedLedgerFactoryConfig config) throws Exception {
+ public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+ ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
+ throws Exception {
+ this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config, statsLogger);
+ }
+
+ private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+ boolean isBookkeeperManaged, ZooKeeper zooKeeper,
+ ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(config.getNumManagedLedgerSchedulerThreads())
+ .statsLogger(statsLogger)
+ .traceTaskExecution(config.isTraceTaskExecution())
.name("bookkeeper-ml-scheduler")
.build();
orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumManagedLedgerWorkerThreads())
+ .statsLogger(statsLogger)
+ .traceTaskExecution(config.isTraceTaskExecution())
.name("bookkeeper-ml-workers")
.build();
cacheEvictionExecutor = Executors
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b91fe0a..340a19d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1029,6 +1029,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to check the need for sending an explicit LAC")
private int bookkeeperExplicitLacIntervalInMills = 0;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "whether expose managed ledger client stats to prometheus"
+ )
+ private boolean bookkeeperClientExposeStatsToPrometheus = false;
+
/**** --- Managed Ledger --- ****/
@FieldContext(
minValue = 1,
@@ -1231,6 +1237,19 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
private long managedLedgerAddEntryTimeoutSeconds = 0;
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Managed ledger prometheus stats latency rollover seconds"
+ )
+ private int managedLedgerPrometheusStatsLatencyRolloverSeconds = 60;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Whether trace managed ledger task execution time"
+ )
+ private boolean managedLedgerTraceTaskExecution = true;
+
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "New entries check delay for the cursor under the managed ledger. \n"
+ "If no new messages in the topic, the cursor will try to check again after the delay time. \n"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
index c0c43ab..476857a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.zookeeper.ZooKeeper;
@@ -35,5 +36,9 @@ public interface BookKeeperClientFactory {
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
+ BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> ensemblePlacementPolicyProperties,
+ StatsLogger statsLogger) throws IOException;
void close();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 37e8494..26f56d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -37,6 +37,8 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
@@ -54,7 +56,15 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
- Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties) throws IOException {
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties) throws IOException {
+ return create(conf, zkClient, ensemblePlacementPolicyClass, properties, NullStatsLogger.INSTANCE);
+ }
+
+ @Override
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
ClientConfiguration bkConf = createBkClientConfiguration(conf);
if (properties != null) {
properties.forEach((key, value) -> bkConf.setProperty(key, value));
@@ -67,6 +77,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
try {
return BookKeeper.forConfig(bkConf)
.allocator(PulsarByteBufAllocator.DEFAULT)
+ .statsLogger(statsLogger)
.build();
} catch (InterruptedException | BKException e) {
throw new IOException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 09fea0c..d96b32b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -25,12 +25,18 @@ import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +51,7 @@ public class ManagedLedgerClientFactory implements Closeable {
private final ManagedLedgerFactory managedLedgerFactory;
private final BookKeeper defaultBkClient;
private final Map<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
+ private StatsProvider statsProvider = new NullStatsProvider();
public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider) throws Exception {
@@ -56,6 +63,19 @@ public class ManagedLedgerClientFactory implements Closeable {
managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
+ managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+ managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
+
+ Configuration configuration = new ClientConfiguration();
+ if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
+ configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+ conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+ configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, conf.getClusterName());
+ statsProvider = new PrometheusMetricsProvider();
+ }
+
+ statsProvider.start(configuration);
+ StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_managedLedger_client");
this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
@@ -80,7 +100,7 @@ public class ManagedLedgerClientFactory implements Closeable {
return bkClient != null ? bkClient : defaultBkClient;
};
- this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig);
+ this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig, statsLogger);
}
public ManagedLedgerFactory getManagedLedgerFactory() {
@@ -91,16 +111,22 @@ public class ManagedLedgerClientFactory implements Closeable {
return defaultBkClient;
}
+ public StatsProvider getStatsProvider() {
+ return statsProvider;
+ }
+
@VisibleForTesting
public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
return bkEnsemblePolicyToBkClientMap;
}
+ @Override
public void close() throws IOException {
try {
managedLedgerFactory.shutdown();
log.info("Closed managed ledger factory");
+ statsProvider.stop();
try {
defaultBkClient.close();
} catch (RejectedExecutionException ree) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 5796c8e..745eef8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -24,8 +24,12 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.io.StringWriter;
+import java.io.Writer;
import java.util.Enumeration;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.broker.PulsarService;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
@@ -85,6 +89,8 @@ public class PrometheusMetricsGenerator {
generateBrokerBasicMetrics(pulsar, stream);
+ generateManagedLedgerBookieClientMetrics(pulsar, stream);
+
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
buf.release();
@@ -155,6 +161,21 @@ public class PrometheusMetricsGenerator {
}
}
+ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsar, SimpleTextOutputStream stream) {
+ StatsProvider statsProvider = pulsar.getManagedLedgerClientFactory().getStatsProvider();
+ if (statsProvider instanceof NullStatsProvider) {
+ return;
+ }
+
+ try {
+ Writer writer = new StringWriter();
+ statsProvider.writeAllMetrics(writer);
+ stream.write(writer.toString());
+ } catch (IOException e) {
+ // nop
+ }
+ }
+
private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
Enumeration<MetricFamilySamples> metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples();
while (metricFamilySamples.hasMoreElements()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
new file mode 100644
index 0000000..3ef453d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.yahoo.sketches.quantiles.DoublesSketch;
+import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
+import com.yahoo.sketches.quantiles.DoublesUnion;
+import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.bookkeeper.stats.OpStatsData;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * OpStatsLogger implementation that uses DataSketches library to calculate the approximated latency quantiles.
+ */
+public class DataSketchesOpStatsLogger implements OpStatsLogger {
+
+ /**
+ * Use 2 rotating thread local accessor so that we can safely swap them.
+ */
+ private volatile ThreadLocalAccessor current;
+ private volatile ThreadLocalAccessor replacement;
+
+ /**
+ * These are the sketches where all the aggregated results are published.
+ */
+ private volatile DoublesSketch successResult;
+ private volatile DoublesSketch failResult;
+
+ private final LongAdder successCountAdder = new LongAdder();
+ private final LongAdder failCountAdder = new LongAdder();
+
+ private final LongAdder successSumAdder = new LongAdder();
+ private final LongAdder failSumAdder = new LongAdder();
+
+ DataSketchesOpStatsLogger() {
+ this.current = new ThreadLocalAccessor();
+ this.replacement = new ThreadLocalAccessor();
+ }
+
+ @Override
+ public void registerFailedEvent(long eventLatency, TimeUnit unit) {
+ double valueMillis = unit.toMicros(eventLatency) / 1000.0;
+
+ failCountAdder.increment();
+ failSumAdder.add((long) valueMillis);
+
+ LocalData localData = current.localData.get();
+
+ long stamp = localData.lock.readLock();
+ try {
+ localData.failSketch.update(valueMillis);
+ } finally {
+ localData.lock.unlockRead(stamp);
+ }
+ }
+
+ @Override
+ public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
+ double valueMillis = unit.toMicros(eventLatency) / 1000.0;
+
+ successCountAdder.increment();
+ successSumAdder.add((long) valueMillis);
+
+ LocalData localData = current.localData.get();
+
+ long stamp = localData.lock.readLock();
+ try {
+ localData.successSketch.update(valueMillis);
+ } finally {
+ localData.lock.unlockRead(stamp);
+ }
+ }
+
+ @Override
+ public void registerSuccessfulValue(long value) {
+ successCountAdder.increment();
+ successSumAdder.add(value);
+
+ LocalData localData = current.localData.get();
+
+ long stamp = localData.lock.readLock();
+ try {
+ localData.successSketch.update(value);
+ } finally {
+ localData.lock.unlockRead(stamp);
+ }
+ }
+
+ @Override
+ public void registerFailedValue(long value) {
+ failCountAdder.increment();
+ failSumAdder.add(value);
+
+ LocalData localData = current.localData.get();
+
+ long stamp = localData.lock.readLock();
+ try {
+ localData.failSketch.update(value);
+ } finally {
+ localData.lock.unlockRead(stamp);
+ }
+ }
+
+ @Override
+ public OpStatsData toOpStatsData() {
+ // Not relevant as we don't use JMX here
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ // Not relevant as we don't use JMX here
+ throw new UnsupportedOperationException();
+ }
+
+ public void rotateLatencyCollection() {
+ // Swap current with replacement
+ ThreadLocalAccessor local = current;
+ current = replacement;
+ replacement = local;
+
+ final DoublesUnion aggregateSuccesss = new DoublesUnionBuilder().build();
+ final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
+ local.map.forEach((localData, b) -> {
+ long stamp = localData.lock.writeLock();
+ try {
+ aggregateSuccesss.update(localData.successSketch);
+ localData.successSketch.reset();
+ aggregateFail.update(localData.failSketch);
+ localData.failSketch.reset();
+ } finally {
+ localData.lock.unlockWrite(stamp);
+ }
+ });
+
+ successResult = aggregateSuccesss.getResultAndReset();
+ failResult = aggregateFail.getResultAndReset();
+ }
+
+ public long getCount(boolean success) {
+ return success ? successCountAdder.sum() : failCountAdder.sum();
+ }
+
+ public long getSum(boolean success) {
+ return success ? successSumAdder.sum() : failSumAdder.sum();
+ }
+
+ public double getQuantileValue(boolean success, double quantile) {
+ DoublesSketch s = success ? successResult : failResult;
+ return s != null ? s.getQuantile(quantile) : Double.NaN;
+ }
+
+ private static class LocalData {
+ private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
+ private final DoublesSketch failSketch = new DoublesSketchBuilder().build();
+ private final StampedLock lock = new StampedLock();
+ }
+
+ private static class ThreadLocalAccessor {
+ private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
+ private final FastThreadLocal<LocalData> localData = new FastThreadLocal<LocalData>() {
+
+ @Override
+ protected LocalData initialValue() throws Exception {
+ LocalData localData = new LocalData();
+ map.put(localData, Boolean.TRUE);
+ return localData;
+ }
+
+ @Override
+ protected void onRemoval(LocalData value) throws Exception {
+ map.remove(value);
+ }
+ };
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
new file mode 100644
index 0000000..a8a05e1
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.Collector;
+import org.apache.bookkeeper.stats.CachingStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.stats.prometheus.LongAdderCounter;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.concurrent.*;
+
+/**
+ * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ */
+public class PrometheusMetricsProvider implements StatsProvider {
+ private ScheduledExecutorService executor;
+
+ public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
+ public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
+ public static final String CLUSTER_NAME = "cluster";
+ public static final String DEFAULT_CLUSTER_NAME = "pulsar";
+
+ private String cluster;
+ private final CachingStatsProvider cachingStatsProvider;
+
+ /**
+ * These acts a registry of the metrics defined in this provider
+ */
+ final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
+ final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
+ final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+
+ public PrometheusMetricsProvider() {
+ this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
+ @Override
+ public void start(Configuration conf) {
+ // nop
+ }
+
+ @Override
+ public void stop() {
+ // nop
+ }
+
+ @Override
+ public StatsLogger getStatsLogger(String scope) {
+ return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
+ }
+
+ @Override
+ public String getStatsName(String... statsComponents) {
+ String completeName;
+ if (statsComponents.length == 0) {
+ return "";
+ } else if (statsComponents[0].isEmpty()) {
+ completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
+ } else {
+ completeName = StringUtils.join(statsComponents, '_');
+ }
+ return Collector.sanitizeMetricName(completeName);
+ }
+ });
+ }
+
+ @Override
+ public void start(Configuration conf) {
+ executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
+
+ int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+ DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
+ cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+
+ executor.scheduleAtFixedRate(() -> {
+ rotateLatencyCollection();
+ }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void stop() {
+ executor.shutdownNow();
+ }
+
+ @Override
+ public StatsLogger getStatsLogger(String scope) {
+ return this.cachingStatsProvider.getStatsLogger(scope);
+ }
+
+ @Override
+ public void writeAllMetrics(Writer writer) throws IOException {
+ gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
+ counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
+ opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster, opStatLogger));
+ }
+
+ @Override
+ public String getStatsName(String... statsComponents) {
+ return cachingStatsProvider.getStatsName(statsComponents);
+ }
+
+ @VisibleForTesting
+ void rotateLatencyCollection() {
+ opStats.forEach((name, metric) -> {
+ metric.rotateLatencyCollection();
+ });
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java
new file mode 100644
index 0000000..ad3c62f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.base.Joiner;
+import io.prometheus.client.Collector;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.prometheus.LongAdderCounter;
+
+/**
+ * A {@code Prometheus} based {@link StatsLogger} implementation.
+ */
+public class PrometheusStatsLogger implements StatsLogger {
+
+ private final PrometheusMetricsProvider provider;
+ private final String scope;
+
+ PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope) {
+ this.provider = provider;
+ this.scope = scope;
+ }
+
+ @Override
+ public OpStatsLogger getOpStatsLogger(String name) {
+ return provider.opStats.computeIfAbsent(completeName(name), x -> new DataSketchesOpStatsLogger());
+ }
+
+ @Override
+ public Counter getCounter(String name) {
+ return provider.counters.computeIfAbsent(completeName(name), x -> new LongAdderCounter());
+ }
+
+ @Override
+ public <T extends Number> void registerGauge(String name, Gauge<T> gauge) {
+ provider.gauges.computeIfAbsent(completeName(name), x -> new SimpleGauge<T>(gauge));
+ }
+
+ @Override
+ public <T extends Number> void unregisterGauge(String name, Gauge<T> gauge) {
+ // no-op
+ }
+
+ @Override
+ public void removeScope(String name, StatsLogger statsLogger) {
+ // no-op
+ }
+
+ @Override
+ public StatsLogger scope(String name) {
+ return new PrometheusStatsLogger(provider, completeName(name));
+ }
+
+ private String completeName(String name) {
+ String completeName = scope.isEmpty() ? name : Joiner.on('_').join(scope, name);
+ return Collector.sanitizeMetricName(completeName);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
new file mode 100644
index 0000000..abe0b56
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import org.apache.bookkeeper.stats.Counter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+/**
+ * Logic to write metrics in Prometheus text format.
+ */
+public class PrometheusTextFormatUtil {
+ static void writeGauge(Writer w, String name, String cluster, SimpleGauge<? extends Number> gauge) {
+ // Example:
+ // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge
+ // pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"} 1044057
+ try {
+ w.append("# TYPE ").append(name).append(" gauge\n");
+ w.append(name).append("{cluster=\"").append(cluster).append("\"}")
+ .append(' ').append(gauge.getSample().toString()).append('\n');
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static void writeCounter(Writer w, String name, String cluster, Counter counter) {
+ // Example:
+ // # TYPE jvm_threads_started_total counter
+ // jvm_threads_started_total{cluster="test"} 59
+ try {
+ w.append("# TYPE ").append(name).append(" counter\n");
+ w.append(name).append("{cluster=\"").append(cluster).append("\"}")
+ .append(' ').append(counter.get().toString()).append('\n');
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static void writeOpStat(Writer w, String name, String cluster, DataSketchesOpStatsLogger opStat) {
+ // Example:
+ // # TYPE pulsar_bookie_client_bookkeeper_ml_workers_task_queued summary
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.5"} NaN
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.75"} NaN
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.95"} NaN
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.99"} NaN
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.999"} NaN
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.9999"} NaN
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="1.0"} -Infinity
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_count{cluster="pulsar", success="false"} 0
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_sum{cluster="pulsar", success="false"} 0.0
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.5"} 0.031
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.75"} 0.043
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.95"} 0.061
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.99"} 0.064
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.999"} 0.073
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.9999"} 0.073
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="1.0"} 0.552
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_count{cluster="pulsar", success="true"} 40911432
+ // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_sum{cluster="pulsar", success="true"} 527.0
+ try {
+ w.append("# TYPE ").append(name).append(" summary\n");
+ writeQuantile(w, opStat, name, cluster,false, 0.5);
+ writeQuantile(w, opStat, name, cluster, false, 0.75);
+ writeQuantile(w, opStat, name, cluster,false, 0.95);
+ writeQuantile(w, opStat, name, cluster,false, 0.99);
+ writeQuantile(w, opStat, name, cluster,false, 0.999);
+ writeQuantile(w, opStat, name, cluster,false, 0.9999);
+ writeQuantile(w, opStat, name, cluster,false, 1.0);
+ writeCount(w, opStat, name, cluster, false);
+ writeSum(w, opStat, name, cluster, false);
+
+ writeQuantile(w, opStat, name, cluster,true, 0.5);
+ writeQuantile(w, opStat, name, cluster,true, 0.75);
+ writeQuantile(w, opStat, name, cluster,true, 0.95);
+ writeQuantile(w, opStat, name, cluster,true, 0.99);
+ writeQuantile(w, opStat, name, cluster,true, 0.999);
+ writeQuantile(w, opStat, name, cluster,true, 0.9999);
+ writeQuantile(w, opStat, name, cluster,true, 1.0);
+ writeCount(w, opStat, name, cluster, true);
+ writeSum(w, opStat, name, cluster, true);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
+ Boolean success, double quantile) throws IOException {
+ w.append(name).append("{cluster=\"").append(cluster).append("\", success=\"")
+ .append(success.toString()).append("\",quantile=\"")
+ .append(Double.toString(quantile)).append("\"} ")
+ .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n');
+ }
+
+ private static void writeCount(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
+ Boolean success) throws IOException {
+ w.append(name).append("_count{cluster=\"").append(cluster).append("\", success=\"")
+ .append(success.toString()).append("\"} ")
+ .append(Long.toString(opStat.getCount(success))).append('\n');
+ }
+
+ private static void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
+ Boolean success) throws IOException {
+ w.append(name).append("_sum{cluster=\"").append(cluster).append("\", success=\"")
+ .append(success.toString()).append("\"} ")
+ .append(Double.toString(opStat.getSum(success))).append('\n');
+ }
+
+ static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry) throws IOException {
+ Enumeration<MetricFamilySamples> metricFamilySamples = registry.metricFamilySamples();
+ while (metricFamilySamples.hasMoreElements()) {
+ MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
+
+ for (int i = 0; i < metricFamily.samples.size(); i++) {
+ Sample sample = metricFamily.samples.get(i);
+ w.write(sample.name);
+ w.write('{');
+ for (int j = 0; j < sample.labelNames.size(); j++) {
+ if (j != 0) {
+ w.write(", ");
+ }
+ w.write(sample.labelNames.get(j));
+ w.write("=\"");
+ w.write(sample.labelValues.get(j));
+ w.write('"');
+ }
+
+ w.write("} ");
+ w.write(Collector.doubleToGoString(sample.value));
+ w.write('\n');
+ }
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
similarity index 55%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
index c0c43ab..a93a26c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
@@ -16,24 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker;
+package org.apache.pulsar.broker.stats.prometheus.metrics;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.bookkeeper.stats.Gauge;
/**
- * Provider of a new BookKeeper client instance
+ * A {@link Gauge} implementation that forwards on the value supplier.
*/
-public interface BookKeeperClientFactory {
- BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
- Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
+public class SimpleGauge<T extends Number> {
+
+ private final Gauge<T> gauge;
+
+ public SimpleGauge(final Gauge<T> gauge) {
+ this.gauge = gauge;
+ }
- void close();
+ Number getSample() {
+ return gauge.getSample();
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 4b7f49b..7c66ccc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +62,13 @@ public class MockedBookKeeperClientFactory implements BookKeeperClientFactory {
}
@Override
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
+ return mockedBk;
+ }
+
+ @Override
public void close() {
try {
mockedBk.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c1e39b4..fc0c66d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.auth;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
@@ -44,6 +45,7 @@ import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarService;
@@ -106,6 +108,7 @@ public abstract class MockedPulsarServiceBaseTest {
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
+ this.conf.setBookkeeperClientExposeStatsToPrometheus(true);
}
protected final void internalSetup() throws Exception {
@@ -339,6 +342,14 @@ public abstract class MockedPulsarServiceBaseTest {
}
@Override
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties, StatsLogger statsLogger) {
+ // Always return the same instance (so that we don't loose the mock BK content on broker restart
+ return mockBookKeeper;
+ }
+
+ @Override
public void close() {
// no-op
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 948bdbd..8d4cef4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -471,6 +471,50 @@ public class PrometheusMetricsTest extends BrokerTestBase {
p2.close();
}
+ @Test
+ public void testManagedLedgerBookieClientStats() throws Exception {
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+ String metricsStr = new String(statsOut.toByteArray());
+
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e ->
+ System.out.println(e.getKey() + ": " + e.getValue())
+ );
+
+ List<Metric> cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_scheduler_completed_tasks_0");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_scheduler_queue_0");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_scheduler_total_tasks_0");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_workers_completed_tasks_0");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_workers_task_execution_count");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ p1.close();
+ p2.close();
+ }
+
/**
* Hacky parsing of Prometheus text format. Sould be good enough for unit tests
*/
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 081a9df..ffabb31 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -110,6 +110,7 @@ Broker has the following kinds of metrics:
* [BundleSplit metrics](#bundlesplit-metrics)
* [Subscription metrics](#subscription-metrics)
* [Consumer metrics](#consumer-metrics)
+* [ManagedLedger bookie client metrics](#managed-ledger-bookie-client-metrics)
### Namespace metrics
@@ -324,6 +325,25 @@ All the consumer metrics are labelled with the following labels:
| pulsar_consumer_msg_throughput_out | Gauge | The total message dispatch throughput for a consumer (bytes/second). |
| pulsar_consumer_available_permits | Gauge | The available permits for for a consumer. |
+### Managed ledger bookie client metrics
+
+All the managed ledger bookie client metrics labelled with the following labels:
+
+- *cluster*: `cluster=${pulsar_cluster}`. `${pulsar_cluster}` is the cluster name that you configured in `broker.conf`.
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_completed_tasks_* | Gauge | The number of tasks the scheduler executor execute completed. <br>The number of metrics determined by the scheduler executor thread number configured by `managedLedgerNumSchedulerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_queue_* | Gauge | The number of tasks queued in the scheduler executor's queue. <br>The number of metrics determined by scheduler executor's thread number configured by `managedLedgerNumSchedulerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_total_tasks_* | Gauge | The total number of tasks the scheduler executor received. <br>The number of metrics determined by scheduler executor's thread number configured by `managedLedgerNumSchedulerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_completed_tasks_* | Gauge | The number of tasks the worker executor execute completed. <br>The number of metrics determined by the number of worker task thread number configured by `managedLedgerNumWorkerThreads` in `broker.conf` <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_queue_* | Gauge | The number of tasks queued in the worker executor's queue. <br>The number of metrics determined by scheduler executor's thread number configured by `managedLedgerNumWorkerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_total_tasks_* | Gauge | The total number of tasks the worker executor received. <br>The number of metrics determined by worker executor's thread number configured by `managedLedgerNumWorkerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_task_execution | Summary | The scheduler task execution latency calculated in milliseconds. |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_task_queued | Summary | The scheduler task queued latency calculated in milliseconds. |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_task_execution | Summary | The worker task execution latency calculated in milliseconds. |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_task_queued | Summary | The worker task queued latency calculated in milliseconds. |
+
# Pulsar Functions
All the Pulsar Functions metrics are labelled with the following labels:
@@ -387,7 +407,6 @@ All the proxy metrics are labelled with the following labels:
| split_record_deserialize_time_per_query | Summary | Time spent on deserializing message to record per query. |
| split_total_execution_time | Summary | Total execution time . |
-
## Monitor
You can [set up a Prometheus instance](https://prometheus.io/) to collect all the metrics exposed at Pulsar components and set up