You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/07 01:00:07 UTC
[incubator-pulsar] branch master updated: Fixed storage latency
stats with Prometheus (#1185)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 69db894 Fixed storage latency stats with Prometheus (#1185)
69db894 is described below
commit 69db8942eafe7b712227e8804ebaa2afc58b6e70
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Feb 6 17:00:01 2018 -0800
Fixed storage latency stats with Prometheus (#1185)
* Fixed storage latency stats with Prometheus
* Added tests for prometheus stats generator
* Added license header
---
.../stats/prometheus/NamespaceStatsAggregator.java | 26 ++-
.../prometheus/PrometheusMetricsGenerator.java | 10 +-
.../pulsar/broker/stats/prometheus/TopicStats.java | 54 ++++--
.../pulsar/broker/stats/PrometheusMetricsTest.java | 184 +++++++++++++++++++++
4 files changed, 252 insertions(+), 22 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 7fc58ba..18693da 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -36,10 +36,17 @@ public class NamespaceStatsAggregator {
}
};
+ private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
+ @Override
+ protected TopicStats initialValue() throws Exception {
+ return new TopicStats();
+ }
+ };
+
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
- TopicStats topicStats = new TopicStats();
+ TopicStats topicStats = localTopicStats.get();
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
@@ -47,27 +54,36 @@ public class NamespaceStatsAggregator {
bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats);
- namespaceStats.updateStats(topicStats);
+
if (includeTopicMetrics) {
- TopicStats.printNamespaceStats(stream, cluster, namespace, name, topicStats);
+ TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
+ } else {
+ namespaceStats.updateStats(topicStats);
}
});
});
- printNamespaceStats(stream, cluster, namespace, namespaceStats);
+ if (!includeTopicMetrics) {
+ // Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
+ // the same data twice, and it will make the aggregation difficult
+ printNamespaceStats(stream, cluster, namespace, namespaceStats);
+ }
});
}
private static void getTopicStats(Topic topic, TopicStats stats) {
stats.reset();
- if(topic instanceof PersistentTopic) {
+ if (topic instanceof PersistentTopic) {
// Managed Ledger stats
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ((PersistentTopic)topic).getManagedLedger().getStats();
stats.storageSize = mlStats.getStoredMessagesSize();
+
stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
+ stats.storageWriteLatencyBuckets.refresh();
stats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());
+ stats.entrySizeBuckets.refresh();
stats.storageWriteRate = mlStats.getAddEntryMessagesRate();
stats.storageReadRate = mlStats.getReadEntriesRate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 2afc8a4..26118c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -25,11 +25,10 @@ import java.util.Enumeration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
import org.apache.pulsar.utils.SimpleTextOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.Collector;
import io.prometheus.client.Collector.MetricFamilySamples;
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
@@ -56,10 +55,9 @@ public class PrometheusMetricsGenerator {
}).register(CollectorRegistry.defaultRegistry);
Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
- @SuppressWarnings("restriction")
@Override
public double get() {
- return sun.misc.VM.maxDirectMemory();
+ return PlatformDependent.maxDirectMemory();
}
}).register(CollectorRegistry.defaultRegistry);
}
@@ -92,7 +90,9 @@ public class PrometheusMetricsGenerator {
stream.write(sample.labelNames.get(j));
stream.write("=\"");
stream.write(sample.labelValues.get(j));
- stream.write("\",");
+ if (j != sample.labelNames.size() - 1) {
+ stream.write("\",");
+ }
}
stream.write("} ");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 02923cc..a750b75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -64,24 +64,54 @@ class TopicStats {
entrySizeBuckets.reset();
}
- static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- TopicStats stats) {
+ static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ TopicStats stats) {
- metric(stream, cluster, namespace, topic,"pulsar_subscriptions_count", stats.subscriptionsCount);
- metric(stream, cluster, namespace, topic,"pulsar_producers_count", stats.producersCount);
- metric(stream, cluster, namespace, topic,"pulsar_consumers_count", stats.consumersCount);
+ metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
+ metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
+ metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
- metric(stream, cluster, namespace, topic,"pulsar_rate_in", stats.rateIn);
- metric(stream, cluster, namespace, topic,"pulsar_rate_out", stats.rateOut);
- metric(stream, cluster, namespace, topic,"pulsar_throughput_in", stats.throughputIn);
- metric(stream, cluster, namespace, topic,"pulsar_throughput_out", stats.throughputOut);
+ metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn);
+ metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut);
+ metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn);
+ metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut);
+
+ metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.storageSize);
+ metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
+
+ long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
+ stats.storageWriteLatencyBuckets.getCount());
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
+ stats.storageWriteLatencyBuckets.getSum());
+
+ long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
- metric(stream, cluster, namespace, topic,"pulsar_storage_size", stats.storageSize);
- metric(stream, cluster, namespace, topic,"pulsar_msg_backlog", stats.msgBacklog);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String name, double value) {
+ String name, double value) {
stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
.write("\", topic=\"").write(topic).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
new file mode 100644
index 0000000..be19b57
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.testng.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.Producer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class PrometheusMetricsTest extends BrokerTestBase {
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testPerTopicStats() throws Exception {
+ Producer p1 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1");
+ Producer p2 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2");
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, statsOut);
+ String metricsStr = new String(statsOut.toByteArray());
+
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e -> {
+ System.out.println(e.getKey() + ": " + e.getValue());
+ });
+
+ // There should be 2 metrics with different tags for each topic
+ List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+
+ cm = (List<Metric>) metrics.get("pulsar_producers_count");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).value, 1.0);
+ assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(1).value, 1.0);
+ assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+
+ p1.close();
+ p2.close();
+ }
+
+ @Test
+ public void testPerNamespaceStats() throws Exception {
+ Producer p1 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1");
+ Producer p2 = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2");
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, statsOut);
+ String metricsStr = new String(statsOut.toByteArray());
+
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e -> {
+ System.out.println(e.getKey() + ": " + e.getValue());
+ });
+
+ // There should be 1 metric aggregated per namespace
+ List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("topic"), null);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ cm = (List<Metric>) metrics.get("pulsar_producers_count");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).value, 2.0);
+ assertEquals(cm.get(0).tags.get("topic"), null);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ p1.close();
+ p2.close();
+ }
+
+ /**
+ * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
+ */
+ private static Multimap<String, Metric> parseMetrics(String metrics) {
+ Multimap<String, Metric> parsed = ArrayListMultimap.create();
+
+ // Example of lines are
+ // jvm_threads_current{cluster="standalone",} 203.0
+ // or
+ // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+ // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+ Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+ Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+ Splitter.on("\n").split(metrics).forEach(line -> {
+ if (line.isEmpty()) {
+ return;
+ }
+
+ Matcher matcher = pattern.matcher(line);
+
+ checkArgument(matcher.matches());
+ String name = matcher.group(1);
+
+ Metric m = new Metric();
+ m.value = Double.valueOf(matcher.group(3));
+
+ String tags = matcher.group(2);
+ Matcher tagsMatcher = tagsPattern.matcher(tags);
+ while (tagsMatcher.find()) {
+ String tag = tagsMatcher.group(1);
+ String value = tagsMatcher.group(2);
+ m.tags.put(tag, value);
+ }
+
+ parsed.put(name, m);
+ });
+
+ return parsed;
+ }
+
+ static class Metric {
+ Map<String, String> tags = new TreeMap<>();
+ double value;
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString();
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.