You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 03:21:37 UTC

[pulsar] branch branch-2.7 updated (c68f843 -> 751b521)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from c68f843  make namespaces policy update take effect on time (#8976)
     new df884b8  add if for SubscriptionBusyException (#9017)
     new 257f60a  remove duplicated broker prometheus metrics type (#8995)
     new 057b33e  Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)
     new 751b521  Add raw prometheus metrics provider. (#9021)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    | 25 +++++++++-
 .../broker/service/BrokerServiceException.java     |  2 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  1 -
 .../prometheus/PrometheusMetricsGenerator.java     | 23 ++++++++--
 .../stats/prometheus/PrometheusMetricsServlet.java | 13 +++++-
 ...tats.java => PrometheusRawMetricsProvider.java} | 25 ++++------
 .../pulsar/broker/service/BrokerServiceTest.java   | 34 ++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 53 ++++++++++++++++++++--
 ...CompressionCodecNone.java => AirliftUtils.java} | 26 +++++------
 .../common/compression/CompressionCodecLZ4.java    |  3 +-
 .../common/compression/CompressionCodecSnappy.java |  1 +
 .../common/compression/CompressionCodecZstd.java   |  2 +-
 .../common/compression/CompressorCodecTest.java    | 31 +++++++++++++
 13 files changed, 198 insertions(+), 41 deletions(-)
 copy pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/{AggregatedConsumerStats.java => PrometheusRawMetricsProvider.java} (69%)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/compression/{CompressionCodecNone.java => AirliftUtils.java} (63%)


[pulsar] 03/04: Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 057b33ea9c3f0f8bc70d27a069ea0e82ecbe1f70
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Dec 22 18:53:27 2020 +0100

    Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)
    
    Fixes #8974
    
    ### Motivation
    In certain cases peeking messages on compresses topics return an error, see #8974 because Airlift does not support readonly ByteBuffers, because they do not give access to the underlying array)
    
    ### Modifications
    
    Copy the ByteByffer in case of unsupported buffer type
    
    ### Verifying this change
    
    This change adds new tests that reproduce the error and demonstrate that the problem is fixed.
    
    (cherry picked from commit cbc606b0b0e836c1238ea1ba92400b3f14e5b349)
---
 .../pulsar/common/compression/AirliftUtils.java    | 38 ++++++++++++++++++++++
 .../common/compression/CompressionCodecLZ4.java    |  3 +-
 .../common/compression/CompressionCodecSnappy.java |  1 +
 .../common/compression/CompressionCodecZstd.java   |  2 +-
 .../common/compression/CompressorCodecTest.java    | 31 ++++++++++++++++++
 5 files changed, 73 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
new file mode 100644
index 0000000..3bfc609
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compression;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities.
+ */
+public abstract class AirliftUtils {
+
+    static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int uncompressedLength) {
+        if (!encodedNio.isDirect() && !encodedNio.hasArray()) {
+            // airlift needs a raw ByteArray
+            ByteBuffer copy = ByteBuffer.allocate(uncompressedLength);
+            copy.put(encodedNio);
+            copy.flip();
+            encodedNio = copy;
+        }
+        return encodedNio;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
index 2493af4..12a03d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
@@ -96,11 +96,12 @@ public class CompressionCodecLZ4 implements CompressionCodec {
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
         uncompressed.writerIndex(uncompressedLength);
         return uncompressed;
     }
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
index 517f1ca..1e31edc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -103,6 +103,7 @@ public class CompressionCodecSnappy implements CompressionCodec {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
 
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
index 944e1e5..18caee6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
@@ -97,7 +97,7 @@ public class CompressionCodecZstd implements CompressionCodec {
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 46e5718..ec84741 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -84,6 +84,37 @@ public class CompressorCodecTest {
     }
 
     @Test(dataProvider = "codec")
+    void testDecompressReadonlyByteBuf(CompressionType type, String compressedText) throws IOException {
+        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
+        byte[] data = text.getBytes();
+        ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
+        raw.writeBytes(data);
+
+        ByteBuf compressed = codec.encode(raw);
+        assertEquals(raw.readableBytes(), data.length);
+
+        int compressedSize = compressed.readableBytes();
+        // Readonly ByteBuffers are not supported by AirLift
+        // https://github.com/apache/pulsar/issues/8974
+        ByteBuf compressedComplexByteBuf = compressed.asReadOnly();
+        ByteBuf uncompressed = codec.decode(compressedComplexByteBuf, data.length);
+
+        assertEquals(compressed.readableBytes(), compressedSize);
+
+        assertEquals(uncompressed.readableBytes(), data.length);
+        assertEquals(uncompressed, raw);
+
+        raw.release();
+        compressed.release();
+        uncompressed.release();
+
+        // Verify compression codecs have the same behavior with buffers ref counting
+        assertEquals(raw.refCnt(), 0);
+        assertEquals(compressed.refCnt(), 0);
+        assertEquals(compressed.refCnt(), 0);
+    }
+
+    @Test(dataProvider = "codec")
     void testEmptyInput(CompressionType type, String compressedText) throws IOException {
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
 


[pulsar] 02/04: remove duplicated broker prometheus metrics type (#8995)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 257f60a7a020b7427da055c82f1bd40431096a9d
Author: Ming <it...@gmail.com>
AuthorDate: Tue Dec 22 12:53:03 2020 -0500

    remove duplicated broker prometheus metrics type (#8995)
    
    ### Motivation
    
    If there are multiple topics from different namespaces, the broker prometheus metrics will print out duplicated `# TYPE` definition for pulsar_ml_AddEntryBytesRate and other managed ledger metrics.
    
    In fact, this problem can be verified by `promtool` https://github.com/prometheus/prometheus#building-from-source
    
    On the broker, run this command to check validity of Pulsar broker metric format.
    `curl localhost:8080/metrics/ | ~/go/bin/promtool check metrics`
    
    ### Modifications
    
    To prevent duplicated metrics type definition, the definition is now tracked and only printed out once. It leverages the existing metrics name Set already defined under parseMetricsToPrometheusMetrics() in PrometheusMetricsGenerator.java
    
    ### Verifying this change
    
    - [ x] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
    
    Added two topics under new namespaces to trigger conditions that duplicated prometheus type could happen previously under testManagedLedgerStats() of PrometheusMetricsTest.java. Updated test cases checks this duplicated type problem.
    
    (cherry picked from commit 73198195efc6f25e162840451e054473daf25f17)
---
 .../prometheus/PrometheusMetricsGenerator.java     | 12 +++--
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 53 ++++++++++++++++++++--
 2 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 39e1440..b7fae4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -150,9 +150,15 @@ public class PrometheusMetricsGenerator {
                         continue;
                     }
                 } else {
-                    stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
-                            .write(getTypeStr(metricType)).write('\n');
-                    stream.write(entry.getKey().replace("brk_", "pulsar_"))
+
+
+                    String name = entry.getKey();
+                    if (!names.contains(name)) {
+                        stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
+                                .write(getTypeStr(metricType)).write('\n');
+                        names.add(name);
+                    }
+                    stream.write(name.replace("brk_", "pulsar_"))
                             .write("{cluster=\"").write(cluster).write('"');
                 }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 96e396f..b5e780e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -471,10 +471,14 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     public void testManagedLedgerStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic1").create();
+        Producer<byte[]> p4 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic2").create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
             p2.send(message.getBytes());
+            p3.send(message.getBytes());
+            p4.send(message.getBytes());
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
@@ -487,18 +491,59 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                 System.out.println(e.getKey() + ": " + e.getValue())
         );
 
+        Map<String, String> typeDefs = new HashMap<String, String>();
+        Map<String, String> metricNames = new HashMap<String, String>();
+
+        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
+
+        Splitter.on("\n").split(metricsStr).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+            if (line.startsWith("#")) {
+                // Check for duplicate type definitions
+                Matcher typeMatcher = typePattern.matcher(line);
+                checkArgument(typeMatcher.matches());
+                String metricName = typeMatcher.group(1);
+                String type = typeMatcher.group(2);
+
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "Only one TYPE line may exist for a given metric name."
+                if (!typeDefs.containsKey(metricName)) {
+                    typeDefs.put(metricName, type);
+                } else {
+                    fail("Duplicate type definition found for TYPE definition " + metricName);
+                }
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
+                if (metricNames.containsKey(metricName)) {
+                    fail("TYPE definition for " + metricName + " appears after first sample");
+                }
+            } else {
+                Matcher metricMatcher = metricNamePattern.matcher(line);
+                checkArgument(metricMatcher.matches());
+                String metricName = metricMatcher.group(1);
+                metricNames.put(metricName, metricName);
+            }
+        });
+
         List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryBytesRate");
-        assertEquals(cm.size(), 1);
+        assertEquals(cm.size(), 2);
         assertEquals(cm.get(0).tags.get("cluster"), "test");
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        String ns = cm.get(0).tags.get("namespace");
+        assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true);
 
         cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
-        assertEquals(cm.size(), 1);
+        assertEquals(cm.size(), 2);
         assertEquals(cm.get(0).tags.get("cluster"), "test");
-        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        ns = cm.get(0).tags.get("namespace");
+        assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true);
 
         p1.close();
         p2.close();
+        p3.close();
+        p4.close();
     }
 
     @Test


[pulsar] 01/04: add if for SubscriptionBusyException (#9017)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit df884b82ca597de45d602f51d5e9d52ecc9ee461
Author: cimura <35...@users.noreply.github.com>
AuthorDate: Wed Dec 23 02:50:53 2020 +0900

    add if for SubscriptionBusyException (#9017)
    
    ### Motivation
    
    when `SubscriptionBusyException` is thrown in a broker, `getClientErrorCode` doesn't deal with it and the returned code is incorrect.
    
    ### Modifications
    
    add `else if` for `SubscriptionBusyException` in `getClientErrorCode`
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    (cherry picked from commit 973c14e1bd429946fc9008eabedab9e734fb07c6)
---
 .../java/org/apache/pulsar/broker/service/BrokerServiceException.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 9c6b2d1..c0c0b1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -191,6 +191,8 @@ public class BrokerServiceException extends Exception {
             return PulsarApi.ServerError.PersistenceError;
         } else if (t instanceof ConsumerBusyException) {
             return PulsarApi.ServerError.ConsumerBusy;
+        } else if (t instanceof SubscriptionBusyException) {
+            return PulsarApi.ServerError.ConsumerBusy;
         } else if (t instanceof ProducerBusyException) {
             return PulsarApi.ServerError.ProducerBusy;
         } else if (t instanceof UnsupportedVersionException) {


[pulsar] 04/04: Add raw prometheus metrics provider. (#9021)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 751b521edbc02544192e6b009abab0708c8bb0c7
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 23 08:53:01 2020 +0800

    Add raw prometheus metrics provider. (#9021)
    
    Pulsar support such plugins as protocol handlers and broker interceptors. This PR is added a RawPrometheusMetrcsProvider which can provide the ability to plugins to add metrics to the broker /metrics endpoint.
    
    (cherry picked from commit b9493fe0aa42ac0eac1aadd20f169c9107acecc3)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 25 +++++++++++++++-
 .../stats/prometheus/NamespaceStatsAggregator.java |  1 -
 .../prometheus/PrometheusMetricsGenerator.java     | 11 +++++++
 .../stats/prometheus/PrometheusMetricsServlet.java | 13 ++++++++-
 .../prometheus/PrometheusRawMetricsProvider.java   | 33 +++++++++++++++++++++
 .../pulsar/broker/service/BrokerServiceTest.java   | 34 ++++++++++++++++++++++
 6 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 61c1cea..785ff43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -38,6 +38,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -95,6 +96,7 @@ import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
@@ -213,6 +215,9 @@ public class PulsarService implements AutoCloseable {
 
     private BrokerInterceptor brokerInterceptor;
 
+    private PrometheusMetricsServlet metricsServlet;
+    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+
     public enum State {
         Init, Started, Closed
     }
@@ -499,9 +504,16 @@ public class PulsarService implements AutoCloseable {
             this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
             this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
             this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
+            this.metricsServlet = new PrometheusMetricsServlet(
+                    this, config.isExposeTopicLevelMetricsInPrometheus(),
+                    config.isExposeConsumerLevelMetricsInPrometheus());
+            if (pendingMetricsProviders != null) {
+                pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
+                this.pendingMetricsProviders = null;
+            }
 
             this.webService.addServlet("/metrics",
-                    new ServletHolder(new PrometheusMetricsServlet(this, config.isExposeTopicLevelMetricsInPrometheus(), config.isExposeConsumerLevelMetricsInPrometheus())),
+                    new ServletHolder(metricsServlet),
                     false, attributeMap);
 
             if (config.isWebSocketServiceEnabled()) {
@@ -1207,6 +1219,17 @@ public class PulsarService implements AutoCloseable {
         return topicPoliciesService;
     }
 
+    public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+        if (metricsServlet == null) {
+            if (pendingMetricsProviders == null) {
+                pendingMetricsProviders = new LinkedList<>();
+            }
+            pendingMetricsProviders.add(metricsProvider);
+        } else {
+            this.metricsServlet.addRawMetricsProvider(metricsProvider);
+        }
+    }
+
     private void startWorkerService(AuthenticationService authenticationService,
                                     AuthorizationService authorizationService)
             throws InterruptedException, IOException, KeeperException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 5766091..ab2e670 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -56,7 +56,6 @@ public class NamespaceStatsAggregator {
         printDefaultBrokerStats(stream, cluster);
 
         LongAdder topicsCount = new LongAdder();
-
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
             topicsCount.reset();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index b7fae4b..bd7ede4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.io.StringWriter;
@@ -86,6 +87,11 @@ public class PrometheusMetricsGenerator {
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, OutputStream out) throws IOException {
+        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, out, null);
+    }
+
+    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
+        OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders) throws IOException {
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
         try {
             SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
@@ -101,6 +107,11 @@ public class PrometheusMetricsGenerator {
 
             generateManagedLedgerBookieClientMetrics(pulsar, stream);
 
+            if (metricsProviders != null) {
+                for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
+                    metricsProvider.generate(stream);
+                }
+            }
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
             buf.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 12058c1..7fbfb75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.stats.prometheus;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -44,6 +46,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
     private final PulsarService pulsar;
     private final boolean shouldExportTopicMetrics;
     private final boolean shouldExportConsumerMetrics;
+    private List<PrometheusRawMetricsProvider> metricsProviders;
 
     private ExecutorService executor = null;
 
@@ -67,7 +70,8 @@ public class PrometheusMetricsServlet extends HttpServlet {
             try {
                 res.setStatus(HttpStatus.OK_200);
                 res.setContentType("text/plain");
-                PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, res.getOutputStream());
+                PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
+                        res.getOutputStream(), metricsProviders);
                 context.complete();
 
             } catch (IOException e) {
@@ -85,5 +89,12 @@ public class PrometheusMetricsServlet extends HttpServlet {
         }
     }
 
+    public void addRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+        if (metricsProviders == null) {
+            metricsProviders = new LinkedList<>();
+        }
+        metricsProviders.add(metricsProvider);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PrometheusMetricsServlet.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
new file mode 100644
index 0000000..d206776
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * The prometheus metrics provider for generate prometheus format metrics.
+ */
+public interface PrometheusRawMetricsProvider {
+
+    /**
+     * Generate the metrics from the metrics provider.
+     * @param stream the stream that write the metrics to
+     */
+    void generate(SimpleTextOutputStream stream);
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 40fa9f9..97ddcca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -29,7 +29,10 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,8 +54,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
@@ -68,6 +77,8 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -962,4 +973,27 @@ public class BrokerServiceTest extends BrokerTestBase {
         }
         assertNull(ledgers.get(topicMlName));
     }
+
+    @Test
+    public void testMetricsProvider() throws IOException {
+        PrometheusRawMetricsProvider rawMetricsProvider = new PrometheusRawMetricsProvider() {
+            @Override
+            public void generate(SimpleTextOutputStream stream) {
+                stream.write("test_metrics{label1=\"xyz\"} 10 \n");
+            }
+        };
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));
+    }
 }