You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 03:00:16 UTC

[pulsar] branch branch-2.6 updated: Add raw prometheus metrics provider. (#9021)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 3d13451  Add raw prometheus metrics provider. (#9021)
3d13451 is described below

commit 3d134512f802ef10869eec145b8113f331011618
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 23 08:53:01 2020 +0800

    Add raw prometheus metrics provider. (#9021)
    
    Pulsar support such plugins as protocol handlers and broker interceptors. This PR is added a RawPrometheusMetrcsProvider which can provide the ability to plugins to add metrics to the broker /metrics endpoint.
    
    (cherry picked from commit b9493fe0aa42ac0eac1aadd20f169c9107acecc3)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 25 +++++++++++++++-
 .../stats/prometheus/NamespaceStatsAggregator.java |  1 -
 .../prometheus/PrometheusMetricsGenerator.java     | 11 +++++++
 .../stats/prometheus/PrometheusMetricsServlet.java | 13 ++++++++-
 .../prometheus/PrometheusRawMetricsProvider.java   | 33 +++++++++++++++++++++
 .../pulsar/broker/service/BrokerServiceTest.java   | 34 ++++++++++++++++++++++
 6 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6b2e48a..5f9db9b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -38,6 +38,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -95,6 +96,7 @@ import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -205,6 +207,9 @@ public class PulsarService implements AutoCloseable {
     private TransactionMetadataStoreService transactionMetadataStoreService;
     private BrokerInterceptor brokerInterceptor;
 
+    private PrometheusMetricsServlet metricsServlet;
+    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+
     public enum State {
         Init, Started, Closed
     }
@@ -485,9 +490,16 @@ public class PulsarService implements AutoCloseable {
             this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
             this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
             this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
+            this.metricsServlet = new PrometheusMetricsServlet(
+                    this, config.isExposeTopicLevelMetricsInPrometheus(),
+                    config.isExposeConsumerLevelMetricsInPrometheus());
+            if (pendingMetricsProviders != null) {
+                pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
+                this.pendingMetricsProviders = null;
+            }
 
             this.webService.addServlet("/metrics",
-                    new ServletHolder(new PrometheusMetricsServlet(this, config.isExposeTopicLevelMetricsInPrometheus(), config.isExposeConsumerLevelMetricsInPrometheus())),
+                    new ServletHolder(metricsServlet),
                     false, attributeMap);
 
             if (config.isWebSocketServiceEnabled()) {
@@ -1178,6 +1190,17 @@ public class PulsarService implements AutoCloseable {
         return topicPoliciesService;
     }
 
+    public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+        if (metricsServlet == null) {
+            if (pendingMetricsProviders == null) {
+                pendingMetricsProviders = new LinkedList<>();
+            }
+            pendingMetricsProviders.add(metricsProvider);
+        } else {
+            this.metricsServlet.addRawMetricsProvider(metricsProvider);
+        }
+    }
+
     private void startWorkerService(AuthenticationService authenticationService,
                                     AuthorizationService authorizationService)
             throws InterruptedException, IOException, KeeperException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a0d8732..dc9e0e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -56,7 +56,6 @@ public class NamespaceStatsAggregator {
         printDefaultBrokerStats(stream, cluster);
 
         LongAdder topicsCount = new LongAdder();
-
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
             topicsCount.reset();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 710cee5..f3ae5e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.io.StringWriter;
@@ -76,6 +77,11 @@ public class PrometheusMetricsGenerator {
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, OutputStream out) throws IOException {
+        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, out, null);
+    }
+
+    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
+        OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders) throws IOException {
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
         try {
             SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
@@ -91,6 +97,11 @@ public class PrometheusMetricsGenerator {
 
             generateManagedLedgerBookieClientMetrics(pulsar, stream);
 
+            if (metricsProviders != null) {
+                for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
+                    metricsProvider.generate(stream);
+                }
+            }
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
             buf.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 12058c1..7fbfb75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.stats.prometheus;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -44,6 +46,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
     private final PulsarService pulsar;
     private final boolean shouldExportTopicMetrics;
     private final boolean shouldExportConsumerMetrics;
+    private List<PrometheusRawMetricsProvider> metricsProviders;
 
     private ExecutorService executor = null;
 
@@ -67,7 +70,8 @@ public class PrometheusMetricsServlet extends HttpServlet {
             try {
                 res.setStatus(HttpStatus.OK_200);
                 res.setContentType("text/plain");
-                PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, res.getOutputStream());
+                PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
+                        res.getOutputStream(), metricsProviders);
                 context.complete();
 
             } catch (IOException e) {
@@ -85,5 +89,12 @@ public class PrometheusMetricsServlet extends HttpServlet {
         }
     }
 
+    public void addRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+        if (metricsProviders == null) {
+            metricsProviders = new LinkedList<>();
+        }
+        metricsProviders.add(metricsProvider);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PrometheusMetricsServlet.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
new file mode 100644
index 0000000..d206776
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * The prometheus metrics provider for generate prometheus format metrics.
+ */
+public interface PrometheusRawMetricsProvider {
+
+    /**
+     * Generate the metrics from the metrics provider.
+     * @param stream the stream that write the metrics to
+     */
+    void generate(SimpleTextOutputStream stream);
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index faf968e..41087b3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -29,7 +29,10 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.HashMap;
@@ -53,8 +56,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
@@ -71,6 +80,8 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -965,4 +976,27 @@ public class BrokerServiceTest extends BrokerTestBase {
         }
         assertNull(ledgers.get(topicMlName));
     }
+
+    @Test
+    public void testMetricsProvider() throws IOException {
+        PrometheusRawMetricsProvider rawMetricsProvider = new PrometheusRawMetricsProvider() {
+            @Override
+            public void generate(SimpleTextOutputStream stream) {
+                stream.write("test_metrics{label1=\"xyz\"} 10 \n");
+            }
+        };
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));
+    }
 }
\ No newline at end of file