You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/07 01:00:07 UTC

[incubator-pulsar] branch master updated: Fixed storage latency stats with Prometheus (#1185)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69db894  Fixed storage latency stats with Prometheus (#1185)
69db894 is described below

commit 69db8942eafe7b712227e8804ebaa2afc58b6e70
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Feb 6 17:00:01 2018 -0800

    Fixed storage latency stats with Prometheus (#1185)
    
    * Fixed storage latency stats with Prometheus
    
    * Added tests for prometheus stats generator
    
    * Added license header
---
 .../stats/prometheus/NamespaceStatsAggregator.java |  26 ++-
 .../prometheus/PrometheusMetricsGenerator.java     |  10 +-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  54 ++++--
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 184 +++++++++++++++++++++
 4 files changed, 252 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 7fc58ba..18693da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -36,10 +36,17 @@ public class NamespaceStatsAggregator {
         }
     };
 
+    private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
+        @Override
+        protected TopicStats initialValue() throws Exception {
+            return new TopicStats();
+        }
+    };
+
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, SimpleTextOutputStream stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
-        TopicStats topicStats = new TopicStats();
+        TopicStats topicStats = localTopicStats.get();
 
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
@@ -47,27 +54,36 @@ public class NamespaceStatsAggregator {
             bundlesMap.forEach((bundle, topicsMap) -> {
                 topicsMap.forEach((name, topic) -> {
                     getTopicStats(topic, topicStats);
-                    namespaceStats.updateStats(topicStats);
+
                     if (includeTopicMetrics) {
-                        TopicStats.printNamespaceStats(stream, cluster, namespace, name, topicStats);
+                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
+                    } else {
+                        namespaceStats.updateStats(topicStats);
                     }
                 });
             });
 
-            printNamespaceStats(stream, cluster, namespace, namespaceStats);
+            if (!includeTopicMetrics) {
+                // Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
+                // the same data twice, and it will make the aggregation difficult
+                printNamespaceStats(stream, cluster, namespace, namespaceStats);
+            }
         });
     }
 
     private static void getTopicStats(Topic topic, TopicStats stats) {
         stats.reset();
 
-        if(topic instanceof PersistentTopic) {
+        if (topic instanceof PersistentTopic) {
             // Managed Ledger stats
             ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ((PersistentTopic)topic).getManagedLedger().getStats();
 
             stats.storageSize = mlStats.getStoredMessagesSize();
+
             stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
+            stats.storageWriteLatencyBuckets.refresh();
             stats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());
+            stats.entrySizeBuckets.refresh();
 
             stats.storageWriteRate = mlStats.getAddEntryMessagesRate();
             stats.storageReadRate = mlStats.getReadEntriesRate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 2afc8a4..26118c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -25,11 +25,10 @@ import java.util.Enumeration;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
 import org.apache.pulsar.utils.SimpleTextOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
 import io.prometheus.client.Collector;
 import io.prometheus.client.Collector.MetricFamilySamples;
 import io.prometheus.client.Collector.MetricFamilySamples.Sample;
@@ -56,10 +55,9 @@ public class PrometheusMetricsGenerator {
         }).register(CollectorRegistry.defaultRegistry);
 
         Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
-            @SuppressWarnings("restriction")
             @Override
             public double get() {
-                return sun.misc.VM.maxDirectMemory();
+                return PlatformDependent.maxDirectMemory();
             }
         }).register(CollectorRegistry.defaultRegistry);
     }
@@ -92,7 +90,9 @@ public class PrometheusMetricsGenerator {
                     stream.write(sample.labelNames.get(j));
                     stream.write("=\"");
                     stream.write(sample.labelValues.get(j));
-                    stream.write("\",");
+                    if (j != sample.labelNames.size() - 1) {
+                        stream.write("\",");
+                    }
                 }
 
                 stream.write("} ");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 02923cc..a750b75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -64,24 +64,54 @@ class TopicStats {
         entrySizeBuckets.reset();
     }
 
-    static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-        TopicStats stats) {
+    static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+            TopicStats stats) {
 
-        metric(stream, cluster, namespace, topic,"pulsar_subscriptions_count", stats.subscriptionsCount);
-        metric(stream, cluster, namespace, topic,"pulsar_producers_count", stats.producersCount);
-        metric(stream, cluster, namespace, topic,"pulsar_consumers_count", stats.consumersCount);
+        metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
+        metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
+        metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
 
-        metric(stream, cluster, namespace, topic,"pulsar_rate_in", stats.rateIn);
-        metric(stream, cluster, namespace, topic,"pulsar_rate_out", stats.rateOut);
-        metric(stream, cluster, namespace, topic,"pulsar_throughput_in", stats.throughputIn);
-        metric(stream, cluster, namespace, topic,"pulsar_throughput_out", stats.throughputOut);
+        metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn);
+        metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut);
+        metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn);
+        metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut);
+
+        metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.storageSize);
+        metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
+
+        long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
+                stats.storageWriteLatencyBuckets.getCount());
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
+                stats.storageWriteLatencyBuckets.getSum());
+
+        long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
 
-        metric(stream, cluster, namespace, topic,"pulsar_storage_size", stats.storageSize);
-        metric(stream, cluster, namespace, topic,"pulsar_msg_backlog", stats.msgBacklog);
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-        String name, double value) {
+            String name, double value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
                 .write("\", topic=\"").write(topic).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
new file mode 100644
index 0000000..be19b57
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.testng.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.Producer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class PrometheusMetricsTest extends BrokerTestBase {
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPerTopicStats() throws Exception {
+        Producer p1 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1");
+        Producer p2 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2");
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e -> {
+            System.out.println(e.getKey() + ": " + e.getValue());
+        });
+
+        // There should be 2 metrics with different tags for each topic
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_producers_count");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).value, 1.0);
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).value, 1.0);
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+
+        p1.close();
+        p2.close();
+    }
+
+    @Test
+    public void testPerNamespaceStats() throws Exception {
+        Producer p1 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1");
+        Producer p2 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2");
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e -> {
+            System.out.println(e.getKey() + ": " + e.getValue());
+        });
+
+        // There should be 1 metric aggregated per namespace
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("topic"), null);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_producers_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 2.0);
+        assertEquals(cm.get(0).tags.get("topic"), null);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        p1.close();
+        p2.close();
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
+     */
+    private static Multimap<String, Metric> parseMetrics(String metrics) {
+        Multimap<String, Metric> parsed = ArrayListMultimap.create();
+
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+        Splitter.on("\n").split(metrics).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+
+            Matcher matcher = pattern.matcher(line);
+
+            checkArgument(matcher.matches());
+            String name = matcher.group(1);
+
+            Metric m = new Metric();
+            m.value = Double.valueOf(matcher.group(3));
+
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+
+            parsed.put(name, m);
+        });
+
+        return parsed;
+    }
+
+    static class Metric {
+        Map<String, String> tags = new TreeMap<>();
+        double value;
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString();
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.