You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 03:00:16 UTC
[pulsar] branch branch-2.6 updated: Add raw prometheus metrics
provider. (#9021)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 3d13451 Add raw prometheus metrics provider. (#9021)
3d13451 is described below
commit 3d134512f802ef10869eec145b8113f331011618
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 23 08:53:01 2020 +0800
Add raw prometheus metrics provider. (#9021)
Pulsar support such plugins as protocol handlers and broker interceptors. This PR is added a RawPrometheusMetrcsProvider which can provide the ability to plugins to add metrics to the broker /metrics endpoint.
(cherry picked from commit b9493fe0aa42ac0eac1aadd20f169c9107acecc3)
---
.../org/apache/pulsar/broker/PulsarService.java | 25 +++++++++++++++-
.../stats/prometheus/NamespaceStatsAggregator.java | 1 -
.../prometheus/PrometheusMetricsGenerator.java | 11 +++++++
.../stats/prometheus/PrometheusMetricsServlet.java | 13 ++++++++-
.../prometheus/PrometheusRawMetricsProvider.java | 33 +++++++++++++++++++++
.../pulsar/broker/service/BrokerServiceTest.java | 34 ++++++++++++++++++++++
6 files changed, 114 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6b2e48a..5f9db9b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -38,6 +38,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -95,6 +96,7 @@ import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -205,6 +207,9 @@ public class PulsarService implements AutoCloseable {
private TransactionMetadataStoreService transactionMetadataStoreService;
private BrokerInterceptor brokerInterceptor;
+ private PrometheusMetricsServlet metricsServlet;
+ private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+
public enum State {
Init, Started, Closed
}
@@ -485,9 +490,16 @@ public class PulsarService implements AutoCloseable {
this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
+ this.metricsServlet = new PrometheusMetricsServlet(
+ this, config.isExposeTopicLevelMetricsInPrometheus(),
+ config.isExposeConsumerLevelMetricsInPrometheus());
+ if (pendingMetricsProviders != null) {
+ pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
+ this.pendingMetricsProviders = null;
+ }
this.webService.addServlet("/metrics",
- new ServletHolder(new PrometheusMetricsServlet(this, config.isExposeTopicLevelMetricsInPrometheus(), config.isExposeConsumerLevelMetricsInPrometheus())),
+ new ServletHolder(metricsServlet),
false, attributeMap);
if (config.isWebSocketServiceEnabled()) {
@@ -1178,6 +1190,17 @@ public class PulsarService implements AutoCloseable {
return topicPoliciesService;
}
+ public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+ if (metricsServlet == null) {
+ if (pendingMetricsProviders == null) {
+ pendingMetricsProviders = new LinkedList<>();
+ }
+ pendingMetricsProviders.add(metricsProvider);
+ } else {
+ this.metricsServlet.addRawMetricsProvider(metricsProvider);
+ }
+ }
+
private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a0d8732..dc9e0e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -56,7 +56,6 @@ public class NamespaceStatsAggregator {
printDefaultBrokerStats(stream, cluster);
LongAdder topicsCount = new LongAdder();
-
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
topicsCount.reset();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 710cee5..f3ae5e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.io.StringWriter;
@@ -76,6 +77,11 @@ public class PrometheusMetricsGenerator {
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, OutputStream out) throws IOException {
+ generate(pulsar, includeTopicMetrics, includeConsumerMetrics, out, null);
+ }
+
+ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
+ OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders) throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
@@ -91,6 +97,11 @@ public class PrometheusMetricsGenerator {
generateManagedLedgerBookieClientMetrics(pulsar, stream);
+ if (metricsProviders != null) {
+ for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
+ metricsProvider.generate(stream);
+ }
+ }
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
buf.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 12058c1..7fbfb75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.stats.prometheus;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -44,6 +46,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
private final PulsarService pulsar;
private final boolean shouldExportTopicMetrics;
private final boolean shouldExportConsumerMetrics;
+ private List<PrometheusRawMetricsProvider> metricsProviders;
private ExecutorService executor = null;
@@ -67,7 +70,8 @@ public class PrometheusMetricsServlet extends HttpServlet {
try {
res.setStatus(HttpStatus.OK_200);
res.setContentType("text/plain");
- PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, res.getOutputStream());
+ PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
+ res.getOutputStream(), metricsProviders);
context.complete();
} catch (IOException e) {
@@ -85,5 +89,12 @@ public class PrometheusMetricsServlet extends HttpServlet {
}
}
+ public void addRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+ if (metricsProviders == null) {
+ metricsProviders = new LinkedList<>();
+ }
+ metricsProviders.add(metricsProvider);
+ }
+
private static final Logger log = LoggerFactory.getLogger(PrometheusMetricsServlet.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
new file mode 100644
index 0000000..d206776
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * The prometheus metrics provider for generate prometheus format metrics.
+ */
+public interface PrometheusRawMetricsProvider {
+
+ /**
+ * Generate the metrics from the metrics provider.
+ * @param stream the stream that write the metrics to
+ */
+ void generate(SimpleTextOutputStream stream);
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index faf968e..41087b3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -29,7 +29,10 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.HashMap;
@@ -53,8 +56,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
@@ -71,6 +80,8 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -965,4 +976,27 @@ public class BrokerServiceTest extends BrokerTestBase {
}
assertNull(ledgers.get(topicMlName));
}
+
+ @Test
+ public void testMetricsProvider() throws IOException {
+ PrometheusRawMetricsProvider rawMetricsProvider = new PrometheusRawMetricsProvider() {
+ @Override
+ public void generate(SimpleTextOutputStream stream) {
+ stream.write("test_metrics{label1=\"xyz\"} 10 \n");
+ }
+ };
+ getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+ HttpClient httpClient = HttpClientBuilder.create().build();
+ final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+ HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+ InputStream inputStream = response.getEntity().getContent();
+ InputStreamReader isReader = new InputStreamReader(inputStream);
+ BufferedReader reader = new BufferedReader(isReader);
+ StringBuffer sb = new StringBuffer();
+ String str;
+ while((str = reader.readLine()) != null){
+ sb.append(str);
+ }
+ Assert.assertTrue(sb.toString().contains("test_metrics"));
+ }
}
\ No newline at end of file