You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/25 06:46:56 UTC

[pulsar] branch master updated: expose managedLedgerCache, managedLedger, loadBalance metrics to prometheus (#6705)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a9ed984  expose managedLedgerCache, managedLedger, loadBalance metrics to prometheus (#6705)
a9ed984 is described below

commit a9ed984f4ccac656fe09f65009efa977cc4e033c
Author: hangc0276 <ha...@163.com>
AuthorDate: Sat Apr 25 14:46:41 2020 +0800

    expose managedLedgerCache, managedLedger, loadBalance metrics to prometheus (#6705)
    
    ## Motivation
    The managed ledger read cache monitor metric is export via /admin/broker-stats/metrics with json format, it is hard to parse, collect and display, what's more the read cache is a very import module for message consuming throughput and latency. So collect and display the read cache metrics is extremely urgent for pulsar in production.
    
    ## Changes
    I parse the json format metric to prometheus message type and export to prometheus monitor port, so those metrics can be displayed in grafana.
    
    Please help check those changes, if it's ok, i will update the metric document.
---
 .../mledger/impl/ManagedLedgerMBeanImpl.java       |  2 +-
 .../prometheus/PrometheusMetricsGenerator.java     | 74 ++++++++++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 65 +++++++++++++++++++
 site2/docs/reference-metrics.md                    | 49 ++++++++++++++
 4 files changed, 189 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 61055b5..47062e9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -30,7 +30,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
 
     public static final long[] ENTRY_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
             200_000, 1000_000 };
-    public static final long[] ENTRY_SIZE_BUCKETS_BYTES = { 128, 512, 1024, 2084, 4096, 16_384, 102_400, 1_232_896 };
+    public static final long[] ENTRY_SIZE_BUCKETS_BYTES = { 128, 512, 1024, 2048, 4096, 16_384, 102_400, 1_232_896 };
 
     private final ManagedLedgerImpl managedLedger;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index b225958..5796c8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -20,10 +20,18 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.Enumeration;
 
 import org.apache.pulsar.broker.PulsarService;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
+
+import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
+import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 import io.netty.buffer.ByteBuf;
@@ -75,12 +83,78 @@ public class PrometheusMetricsGenerator {
             FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
                     pulsar.getConfiguration().getClusterName(), stream);
 
+            generateBrokerBasicMetrics(pulsar, stream);
+
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
             buf.release();
         }
     }
 
+    private static void generateBrokerBasicMetrics(PulsarService pulsar, SimpleTextOutputStream stream) {
+        String clusterName = pulsar.getConfiguration().getClusterName();
+        // generate managedLedgerCache metrics
+        parseMetricsToPrometheusMetrics(new ManagedLedgerCacheMetrics(pulsar).generate(),
+                clusterName, Collector.Type.GAUGE, stream);
+
+        // generate managedLedger metrics
+        parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
+                clusterName, Collector.Type.GAUGE, stream);
+
+        // generate loadBalance metrics
+        parseMetricsToPrometheusMetrics(pulsar.getLoadManager().get().getLoadBalancingMetrics(),
+                clusterName, Collector.Type.GAUGE, stream);
+    }
+
+    private static void parseMetricsToPrometheusMetrics(Collection<Metrics> metrics, String cluster,
+                                                        Collector.Type metricType, SimpleTextOutputStream stream) {
+        Set<String> names = new HashSet<>();
+        for (Metrics metrics1 : metrics) {
+            for (Map.Entry<String, Object> entry : metrics1.getMetrics().entrySet()) {
+                String value = null;
+                if (entry.getKey().contains(".")) {
+                    try {
+                        String key = entry.getKey();
+                        int dotIndex = key.indexOf(".");
+                        int nameIndex = key.substring(0, dotIndex).lastIndexOf("_");
+                        if (nameIndex == -1) {
+                            continue;
+                        }
+
+                        String name = key.substring(0, nameIndex);
+                        value = key.substring(nameIndex + 1);
+                        if (!names.contains(name)) {
+                            stream.write("# TYPE ").write(name.replace("brk_", "pulsar_")).write(' ')
+                                    .write(getTypeStr(metricType)).write("\n");
+                            names.add(name);
+                        }
+                        stream.write(name.replace("brk_", "pulsar_"))
+                                .write("{cluster=\"").write(cluster).write('"');
+                    } catch (Exception e) {
+                        continue;
+                    }
+                } else {
+                    stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
+                            .write(getTypeStr(metricType)).write('\n');
+                    stream.write(entry.getKey().replace("brk_", "pulsar_"))
+                            .write("{cluster=\"").write(cluster).write('"');
+                }
+
+                for (Map.Entry<String, String> metric : metrics1.getDimensions().entrySet()) {
+                    if (metric.getKey().isEmpty() || "cluster".equals(metric.getKey())) {
+                        continue;
+                    }
+                    stream.write(", ").write(metric.getKey()).write("=\"").write(metric.getValue()).write('"');
+                    if (value != null && !value.isEmpty()) {
+                        stream.write(", ").write("quantile=\"").write(value).write('"');
+                    }
+                }
+                stream.write("} ").write(String.valueOf(entry.getValue()))
+                        .write(' ').write(System.currentTimeMillis()).write("\n");
+            }
+        }
+    }
+
     private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
         Enumeration<MetricFamilySamples> metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples();
         while (metricFamilySamples.hasMoreElements()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index e569caf..b3246c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -243,6 +243,71 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         p2.close();
     }
 
+    @Test
+    public void testManagedLedgerCacheStats() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e ->
+                System.out.println(e.getKey() + ": " + e.getValue())
+        );
+
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cache_evictions");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        cm = (List<Metric>) metrics.get("pulsar_ml_cache_hits_rate");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        p1.close();
+        p2.close();
+    }
+
+    @Test
+    public void testManagedLedgerStats() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e ->
+                System.out.println(e.getKey() + ": " + e.getValue())
+        );
+
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryBytesRate");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        p1.close();
+        p2.close();
+    }
 
     /**
      * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 70c3120..11cdc21 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -100,6 +100,8 @@ Broker has the following kinds of metrics:
     * [Replication metrics](#replication-metrics)
 * [Topic metrics](#topic-metrics)
     * [Replication metrics](#replication-metrics-1)
+* [ManagedLedgerCache metrics](#managedledgercache-metrics)
+* [ManagedLedger metrics](#managedledger-metrics)
 * [LoadBalancing metrics](#loadbalancing-metrics)
     * [BundleUnloading metrics](#bundleunloading-metrics)
     * [BundleSplit metrics](#bundlesplit-metrics)
@@ -193,6 +195,53 @@ All the replication metrics will also be labelled with `remoteCluster=${pulsar_r
 | pulsar_replication_throughput_out | Gauge | The total throughput of the topic replicating to remote cluster (bytes/second). |
 | pulsar_replication_backlog | Gauge | The total backlog of the topic replicating to remote cluster (messages). |
 
+### ManagedLedgerCache metrics
+All the ManagedLedgerCache metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you configured in broker.conf.
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_ml_cache_evictions | Gauge | The number of cache evictions during the last minute. |
+| pulsar_ml_cache_hits_rate | Gauge | The number of cache hits per second. |
+| pulsar_ml_cache_hits_throughput | Gauge | The amount of data is retrieved from the cache in byte/s |
+| pulsar_ml_cache_misses_rate | Gauge | The number of cache misses per second |
+| pulsar_ml_cache_misses_throughput | Gauge | The amount of data is retrieved from the cache in byte/s |
+| pulsar_ml_cache_pool_active_allocations | Gauge | The number of currently active allocations in direct arena |
+| pulsar_ml_cache_pool_active_allocations_huge | Gauge | The number of currently active huge allocation in direct arena |
+| pulsar_ml_cache_pool_active_allocations_normal | Gauge | The number of currently active normal allocations in direct arena |
+| pulsar_ml_cache_pool_active_allocations_small | Gauge | The number of currently active small allocations in direct arena |
+| pulsar_ml_cache_pool_active_allocations_tiny | Gauge | The number of currently active tiny allocations in direct arena |
+| pulsar_ml_cache_pool_allocated | Gauge | The total allocated memory of chunk lists in direct arena |
+| pulsar_ml_cache_pool_used | Gauge | The total used memory of chunk lists in direct arena |
+| pulsar_ml_cache_used_size | Gauge | The size in byte used to store the entries payloads |
+| pulsar_ml_count | Gauge | The number of currently opened managed ledgers  |
+
+### ManagedLedger metrics
+All the managedLedger metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you configured in broker.conf.
+- namespace: namespace=${pulsar_namespace}. ${pulsar_namespace} is the namespace name.
+- quantile: quantile=${quantile}. Quantile is only for `Histogram` type metric, and represents the threshold for given Buckets.
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_ml_AddEntryBytesRate | Gauge | The bytes/s rate of messages added |
+| pulsar_ml_AddEntryErrors | Gauge | The number of addEntry requests that failed |
+| pulsar_ml_AddEntryLatencyBuckets | Histogram | The add entry latency of a ledger with a given quantile (threshold).<br> Available quantile: <br><ul><li> quantile="0.0_0.5" is AddEntryLatency between (0.0ms, 0.5ms]</li> <li>quantile="0.5_1.0" is AddEntryLatency between (0.5ms, 1.0ms]</li><li>quantile="1.0_5.0" is AddEntryLatency between (1ms, 5ms]</li><li>quantile="5.0_10.0" is AddEntryLatency between (5ms, 10ms]</li><li>quantile="10.0_20.0" is AddEntryLatency between (10ms, 20ms]</li>< [...]
+| pulsar_ml_AddEntryLatencyBuckets_OVERFLOW | Gauge | The add entry latency > 1s |
+| pulsar_ml_AddEntryMessagesRate | Gauge | The msg/s rate of messages added |
+| pulsar_ml_AddEntrySucceed | Gauge | The number of addEntry requests that succeeded |
+| pulsar_ml_EntrySizeBuckets | Histogram | The add entry size of a ledger with given quantile.<br> Available quantile: <br><ul><li>quantile="0.0_128.0" is EntrySize between (0byte, 128byte]</li><li>quantile="128.0_512.0" is EntrySize between (128byte, 512byte]</li><li>quantile="512.0_1024.0" is EntrySize between (512byte, 1KB]</li><li>quantile="1024.0_2048.0" is EntrySize between (1KB, 2KB]</li><li>quantile="2048.0_4096.0" is EntrySize between (2KB, 4KB]</li><li>quantile="4096.0_16384.0" [...]
+| pulsar_ml_EntrySizeBuckets_OVERFLOW |Gauge  | The add entry size > 1MB |
+| pulsar_ml_LedgerSwitchLatencyBuckets | Histogram | The ledger switch latency with given quantile. <br> Available quantile: <br><ul><li>quantile="0.0_0.5" is EntrySize between (0ms, 0.5ms]</li><li>quantile="0.5_1.0" is EntrySize between (0.5ms, 1ms]</li><li>quantile="1.0_5.0" is EntrySize between (1ms, 5ms]</li><li>quantile="5.0_10.0" is EntrySize between (5ms, 10ms]</li><li>quantile="10.0_20.0" is EntrySize between (10ms, 20ms]</li><li>quantile="20.0_50.0" is EntrySize between (20ms, 5 [...]
+| pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW | Gauge | The ledger switch latency > 1s |
+| pulsar_ml_MarkDeleteRate | Gauge | The rate of mark-delete ops/s |
+| pulsar_ml_NumberOfMessagesInBacklog | Gauge | The number of backlog messages for all the consumers |
+| pulsar_ml_ReadEntriesBytesRate | Gauge | The bytes/s rate of messages read |
+| pulsar_ml_ReadEntriesErrors | Gauge | The number of readEntries requests that failed |
+| pulsar_ml_ReadEntriesRate | Gauge | The msg/s rate of messages read |
+| pulsar_ml_ReadEntriesSucceeded | Gauge | The number of readEntries requests that succeeded |
+| pulsar_ml_StoredMessagesSize | Gauge | The total size of the messages in active ledgers (accounting for the multiple copies stored) |
+
 ### LoadBalancing metrics
 All the loadbalancing metrics are labelled with the following labels:
 - cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you configured in broker.conf.