You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/01 02:11:09 UTC

[rocketmq-mqtt] branch main updated: [ISSUE #38][part 1] just add prometheus frame support (#39)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/main by this push:
     new ab353d0  [ISSUE #38][part 1] just add prometheus frame support (#39)
ab353d0 is described below

commit ab353d08a959796090c8b0d33339fb5c42713b75
Author: tianliuliu <64...@qq.com>
AuthorDate: Fri Apr 1 10:11:04 2022 +0800

    [ISSUE #38][part 1] just add prometheus frame support (#39)
    
    * fix #38 support prometheus exporter
    
    * fix #38 support prometheus exporter 1
    
    * fix #38 support prometheus exporter frame
    
    * fix #38 support prometheus collect pull status metrics
---
 mqtt-cs/pom.xml                                    |   4 +
 .../rocketmq/mqtt/cs/config/ConnectConf.java       |  18 +
 .../rocketmq/mqtt/cs/session/loop/QueueCache.java  |  18 +
 .../rocketmq/mqtt/cs/starter/ExporterServer.java   |  53 +++
 mqtt-ds/pom.xml                                    |   4 +
 .../mqtt/ds/store/LmqQueueStoreManager.java        |   6 +
 mqtt-exporter/pom.xml                              |  36 ++
 .../rocketmq/mqtt/exporter/MqttExporter.java       |  51 ++
 .../exporter/collector/MetricsBuilderFactory.java  |  65 +++
 .../exporter/collector/MqttMetricsCollector.java   | 173 +++++++
 .../mqtt/exporter/collector/MqttMetricsInfo.java   |  90 ++++
 .../mqtt/exporter/collector/SubSystem.java         |  40 ++
 .../exporter/exception/PrometheusException.java    |  34 ++
 .../mqtt/exporter/http/BackedFileOutputStream.java | 273 +++++++++++
 .../mqtt/exporter/http/MqttHTTPServer.java         | 513 +++++++++++++++++++++
 pom.xml                                            |  28 ++
 16 files changed, 1406 insertions(+)

diff --git a/mqtt-cs/pom.xml b/mqtt-cs/pom.xml
index 582771e..d72a4f0 100644
--- a/mqtt-cs/pom.xml
+++ b/mqtt-cs/pom.xml
@@ -18,6 +18,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
+            <artifactId>mqtt-exporter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
         </dependency>
         <dependency>
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
index 955e865..df28bf8 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
@@ -45,6 +45,8 @@ public class ConnectConf {
     private int pullBatchSize = 32;
     private int rpcListenPort = 7001;
     private int retryIntervalSeconds = 3;
+    private int exporterPort = 9090;
+    private boolean enablePrometheus = false;
 
     public ConnectConf() throws IOException {
         ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
@@ -179,4 +181,20 @@ public class ConnectConf {
     public void setRetryIntervalSeconds(int retryIntervalSeconds) {
         this.retryIntervalSeconds = retryIntervalSeconds;
     }
+
+    public int getExporterPort() {
+        return exporterPort;
+    }
+
+    public void setExporterPort(int exporterPort) {
+        this.exporterPort = exporterPort;
+    }
+
+    public boolean isEnablePrometheus() {
+        return enablePrometheus;
+    }
+
+    public void setEnablePrometheus(boolean enablePrometheus) {
+        this.enablePrometheus = enablePrometheus;
+    }
 }
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index d009055..89c6a71 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.mqtt.common.model.Subscription;
 import org.apache.rocketmq.mqtt.common.util.StatUtil;
 import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -182,6 +183,7 @@ public class QueueCache {
                                         CompletableFuture<PullResult> callBackResult) {
         if (subscription.isP2p() || subscription.isRetry()) {
             StatUtil.addPv("NotPullCache", 1);
+            collectorPullCacheStatus("NotPullCache");
             CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -189,6 +191,7 @@ public class QueueCache {
         CacheEntry cacheEntry = cache.getIfPresent(queue);
         if (cacheEntry == null) {
             StatUtil.addPv("NoPullCache", 1);
+            collectorPullCacheStatus("NotPullCache");
             CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -196,6 +199,7 @@ public class QueueCache {
         if (cacheEntry.loading.get()) {
             if (System.currentTimeMillis() - cacheEntry.startLoadingT > 1000) {
                 StatUtil.addPv("LoadPullCacheTimeout", 1);
+                collectorPullCacheStatus("LoadPullCacheTimeout");
                 CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
                 callbackResult(pullResult, callBackResult);
                 return DONE;
@@ -206,10 +210,12 @@ public class QueueCache {
         List<Message> cacheMsgList = cacheEntry.messageList;
         if (cacheMsgList.isEmpty()) {
             if (loadEvent.get(queue) != null) {
+                collectorPullCacheStatus("EmptyPullCacheLATER");
                 StatUtil.addPv("EmptyPullCacheLATER", 1);
                 return LATER;
             }
             StatUtil.addPv("EmptyPullCache", 1);
+            collectorPullCacheStatus("EmptyPullCache");
             CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -217,6 +223,7 @@ public class QueueCache {
 
         if (queueOffset.getOffset() < cacheMsgList.get(0).getOffset()) {
             StatUtil.addPv("OutPullCache", 1);
+            collectorPullCacheStatus("OutPullCache");
             CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -235,9 +242,11 @@ public class QueueCache {
         if (resultMsgs.isEmpty()) {
             if (loadEvent.get(queue) != null) {
                 StatUtil.addPv("PullCacheLATER", 1);
+                collectorPullCacheStatus("PullCacheLATER");
                 return LATER;
             }
             StatUtil.addPv("OutPullCache2", 1);
+            collectorPullCacheStatus("OutPullCache2");
             CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
             callbackResult(pullResult, callBackResult);
             return DONE;
@@ -246,12 +255,21 @@ public class QueueCache {
         pullResult.setMessageList(resultMsgs);
         callBackResult.complete(pullResult);
         StatUtil.addPv("PullFromCache", 1);
+        collectorPullCacheStatus("PullFromCache");
         if (loadEvent.get(queue) != null) {
             return LATER;
         }
         return DONE;
     }
 
+    private void collectorPullCacheStatus(String pullCacheStatus) {
+        try {
+            MqttMetricsCollector.collectPullCacheStatusTps(1, pullCacheStatus);
+        } catch (Throwable e) {
+            logger.error("", e);
+        }
+    }
+
     private void loadCache(boolean isFirst, String firstTopic, Queue queue, QueueOffset queueOffset, int count,
                            QueueEvent event) {
         loadStatus.put(queue, true);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/ExporterServer.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/ExporterServer.java
new file mode 100644
index 0000000..e5749d8
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/ExporterServer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.starter;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+import org.apache.rocketmq.mqtt.common.util.HostInfo;
+import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
+import org.apache.rocketmq.mqtt.exporter.MqttExporter;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ExporterServer {
+    private static final HostInfo HOST_INFO = HostInfo.getInstall();
+    private static final String NAMESPACE = "mqtt";
+    @Resource
+    private ConnectConf connectConf;
+
+    private MqttExporter mqttExporter;
+
+    @PostConstruct
+    public void init() throws Exception {
+        if (connectConf.isEnablePrometheus()) {
+            this.mqttExporter = new MqttExporter(NAMESPACE, HOST_INFO.getName(), HOST_INFO.getAddress(),
+                connectConf.getExporterPort());
+            mqttExporter.start();
+        }
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        if (this.mqttExporter != null) {
+            this.mqttExporter.shutdown();
+        }
+    }
+}
diff --git a/mqtt-ds/pom.xml b/mqtt-ds/pom.xml
index 7aac121..768c508 100644
--- a/mqtt-ds/pom.xml
+++ b/mqtt-ds/pom.xml
@@ -17,6 +17,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
+            <artifactId>mqtt-exporter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
         </dependency>
         <dependency>
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index f521c2b..f4f1c63 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -55,6 +55,7 @@ import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
 import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -218,6 +219,11 @@ public class LmqQueueStoreManager implements LmqQueueStore {
                     result.complete(toLmqPullResult(queue, pullResult));
                     StatUtil.addInvoke("lmqPull", System.currentTimeMillis() - start);
                     StatUtil.addPv(pullResult.getPullStatus().name(), 1);
+                    try {
+                        MqttMetricsCollector.collectPullStatusTps(1, pullResult.getPullStatus().name());
+                    } catch (Throwable e) {
+                        logger.error("collect prometheus error", e);
+                    }
                 }
 
                 @Override
diff --git a/mqtt-exporter/pom.xml b/mqtt-exporter/pom.xml
new file mode 100644
index 0000000..07fd0eb
--- /dev/null
+++ b/mqtt-exporter/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-mqtt</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>mqtt-exporter</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_hotspot</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_httpserver</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/MqttExporter.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/MqttExporter.java
new file mode 100644
index 0000000..ebc6a36
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/MqttExporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter;
+
+import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttExporter {
+    protected static final Logger LOG = LoggerFactory.getLogger(MqttExporter.class);
+
+    private final String nameSpace;
+    private final String hostName;
+    private final String hostIp;
+    private final int exporterPort;
+
+    public MqttExporter(String nameSpace, String hostName, String hostIp, int exporterPort) {
+        this.nameSpace = nameSpace;
+        this.hostName = hostName;
+        this.hostIp = hostIp;
+        this.exporterPort = exporterPort;
+    }
+
+    public void start() throws Exception {
+        // todo if start jvm exporter default
+        DefaultExports.initialize();
+        MqttMetricsCollector.initialize(this.nameSpace, this.hostName, this.hostIp, this.exporterPort);
+        LOG.info("metrics exporter start success");
+
+    }
+
+    public void shutdown() {
+        MqttMetricsCollector.shutdown();
+    }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MetricsBuilderFactory.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MetricsBuilderFactory.java
new file mode 100644
index 0000000..476226d
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MetricsBuilderFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import io.prometheus.client.SimpleCollector.Builder;
+
+public class MetricsBuilderFactory {
+
+    private static Builder newGaugeBuilder(MqttMetricsInfo mqttMetricsInfo) {
+        return Gauge.build().
+            name(mqttMetricsInfo.getName()).
+            help(mqttMetricsInfo.getHelp()).
+            labelNames(mqttMetricsInfo.getLabelNames());
+
+    }
+
+    private static Builder newCounterBuilder(MqttMetricsInfo mqttMetricsInfo) {
+        return Counter.build().
+            name(mqttMetricsInfo.getName()).
+            help(mqttMetricsInfo.getHelp()).
+            labelNames(mqttMetricsInfo.getLabelNames());
+    }
+
+    private static Builder newHistogramBuilder(MqttMetricsInfo mqttMetricsInfo) {
+        return Histogram.build().
+            name(mqttMetricsInfo.getName()).
+            help(mqttMetricsInfo.getHelp()).
+            labelNames(mqttMetricsInfo.getLabelNames()).
+            buckets(mqttMetricsInfo.getBuckets());
+    }
+
+    public static Builder newCollectorBuilder(MqttMetricsInfo mqttMetricsInfo) {
+        switch (mqttMetricsInfo.getType()) {
+            case COUNTER:
+                return newCounterBuilder(mqttMetricsInfo);
+            case GAUGE:
+                return newGaugeBuilder(mqttMetricsInfo);
+            case HISTOGRAM:
+                return newHistogramBuilder(mqttMetricsInfo);
+            default:
+                break;
+
+        }
+        return null;
+    }
+
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
new file mode 100644
index 0000000..820f71b
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.Type;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import io.prometheus.client.SimpleCollector.Builder;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
+import org.apache.rocketmq.mqtt.exporter.http.MqttHTTPServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttMetricsCollector {
+    public static final CollectorRegistry COLLECTOR_REGISTRY = CollectorRegistry.defaultRegistry;
+    private static final Logger LOG = LoggerFactory.getLogger(MqttMetricsCollector.class);
+    private static final Map<Type, Map<MqttMetricsInfo, Collector>> ALL_TYPE_COLLECTORS = new ConcurrentHashMap<>();
+    private static volatile boolean initialized = false;
+    private static String namespace;
+    private static String hostName;
+    private static String hostIp;
+    private static MqttHTTPServer httpServer;
+
+    public static synchronized void initialize(String nameSpace, String hostName, String hostIp, int exporterPort) throws IOException {
+        if (!initialized) {
+            MqttMetricsCollector.namespace = nameSpace;
+            MqttMetricsCollector.hostName = hostName;
+            MqttMetricsCollector.hostIp = hostIp;
+            register(MqttMetricsInfo.values());
+            httpServer = new MqttHTTPServer(new InetSocketAddress(exporterPort), MqttMetricsCollector.COLLECTOR_REGISTRY, false);
+            initialized = true;
+        }
+    }
+
+    private static void register(MqttMetricsInfo[] mqttMetricsInfos) {
+        for (MqttMetricsInfo metricsInfo : mqttMetricsInfos) {
+            register(metricsInfo);
+        }
+    }
+
+    public static void register(MqttMetricsInfo metricsInfo) {
+        Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(metricsInfo.getType());
+        if (mqttMetricsInfoCollectorTypeMap == null) {
+            mqttMetricsInfoCollectorTypeMap = new ConcurrentHashMap<>();
+            Map<MqttMetricsInfo, Collector> oldMap = ALL_TYPE_COLLECTORS.putIfAbsent(metricsInfo.getType(),
+                mqttMetricsInfoCollectorTypeMap);
+
+            if (oldMap != null) {
+                mqttMetricsInfoCollectorTypeMap = oldMap;
+            }
+        }
+        if (mqttMetricsInfoCollectorTypeMap.get(metricsInfo) == null) {
+            Builder builder = MetricsBuilderFactory.newCollectorBuilder(metricsInfo);
+            if (builder == null) {
+                return;
+            }
+            mqttMetricsInfoCollectorTypeMap.putIfAbsent(metricsInfo, buildNameSpace(builder, metricsInfo).register(COLLECTOR_REGISTRY));
+        }
+    }
+
+    public static void unRegister(MqttMetricsInfo metricsInfo) {
+        Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(metricsInfo.getType());
+        if (mqttMetricsInfoCollectorTypeMap == null) {
+            return;
+        }
+        Collector collector = mqttMetricsInfoCollectorTypeMap.get(metricsInfo);
+        if (collector == null) {
+            return;
+        }
+        COLLECTOR_REGISTRY.unregister(collector);
+        mqttMetricsInfoCollectorTypeMap.remove(metricsInfo, collector);
+
+    }
+
+    private static Builder buildNameSpace(Builder builder, MqttMetricsInfo mqttMetricsInfo) {
+        return builder.namespace(namespace).subsystem(mqttMetricsInfo.getSubSystem().getValue());
+    }
+
+    private static String[] paddingClusterLabelValues(String... labelValues) {
+        String[] newLabelValues = new String[labelValues.length + 2];
+        int index = 0;
+        newLabelValues[index++] = hostName;
+        newLabelValues[index++] = hostIp;
+        for (int i = 0; i < labelValues.length; i++, index++) {
+            newLabelValues[index] = labelValues[i];
+        }
+        return newLabelValues;
+    }
+
+    private static void collect(MqttMetricsInfo mqttMetricsInfo, long val, String... labels) throws PrometheusException {
+        Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(mqttMetricsInfo.getType());
+        if (mqttMetricsInfoCollectorTypeMap == null) {
+            throw new PrometheusException("mqttMetricsInfo unregistered or collector type not support: " + mqttMetricsInfo);
+        }
+        Collector collector = mqttMetricsInfoCollectorTypeMap.get(mqttMetricsInfo);
+        if (collector == null) {
+            throw new PrometheusException("mqttMetricsInfo unregistered or collector type not support: " + mqttMetricsInfo);
+        }
+
+        try {
+            if (mqttMetricsInfo.getType() == Type.COUNTER) {
+                Counter counter = (Counter)collector;
+                counter.labels(paddingClusterLabelValues(labels)).inc(val);
+            } else if (mqttMetricsInfo.getType() == Type.GAUGE) {
+                Gauge gauge = (Gauge)collector;
+                gauge.labels(paddingClusterLabelValues(labels)).set(val);
+            } else if (mqttMetricsInfo.getType() == Type.HISTOGRAM) {
+                Histogram histogram = (Histogram)collector;
+                histogram.labels(paddingClusterLabelValues(labels)).observe(val);
+            }
+        } catch (Exception e) {
+            LOG.error("collect metrics exception.", labels2String(labels), val, e);
+            throw new PrometheusException("collect metrics exception", e);
+        }
+    }
+
+    public static void collectDemoTps(long val, String... labels) throws PrometheusException {
+        collect(MqttMetricsInfo.DEMO_TPS, val, labels);
+    }
+
+    public static void collectDemoGuage(long val, String... labels) throws PrometheusException {
+        collect(MqttMetricsInfo.DEMO_GAUGE, val, labels);
+    }
+
+    public static void collectDemoLatency(long val, String... labels) throws PrometheusException {
+        collect(MqttMetricsInfo.DEMO_LATENCY, val, labels);
+    }
+
+    public static void collectPullStatusTps(long val, String... labels) throws PrometheusException {
+        collect(MqttMetricsInfo.PULL_STATUS_TPS, val, labels);
+    }
+
+    public static void collectPullCacheStatusTps(long val, String... labels) throws PrometheusException {
+        collect(MqttMetricsInfo.PULL_CACHE_STATUS_TPS, val, labels);
+    }
+
+    private static String labels2String(String... labels) {
+        StringBuilder sb = new StringBuilder(128);
+        for (String label : labels) {
+            sb.append(label).append(";");
+        }
+        return sb.toString();
+    }
+
+    public static void shutdown() {
+        if (httpServer != null) {
+            httpServer.close();
+        }
+    }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
new file mode 100644
index 0000000..f3b2e14
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+import java.util.Arrays;
+
+import io.prometheus.client.Collector.Type;
+
+public enum MqttMetricsInfo {
+    DEMO_TPS(Type.COUNTER, SubSystem.CS, "tps_total", "demo tps.", null,
+        "hostName", "hostIp"),
+    DEMO_GAUGE(Type.GAUGE, SubSystem.DS,"demo_gauge", "demo gauge status.",
+        null,
+        "hostName", "hostIp"),
+    DEMO_LATENCY(Type.HISTOGRAM, SubSystem.DS,"demo_latency", "latency in microsecond.",
+        new double[] {100, 300, 500, 1000, 3000, 5000, 10000, 50000},
+        "hostName", "hostIp"),
+    PULL_STATUS_TPS(Type.COUNTER, SubSystem.DS, "pull_status_tps_total", "ds pull msg status tps.", null,
+        "hostName", "hostIp", "pullStatus"),
+    PULL_CACHE_STATUS_TPS(Type.COUNTER, SubSystem.CS, "pull_cache_status_tps_total", "ds pull cache status tps.", null,
+        "hostName", "hostIp", "pullCacheStatus");
+
+
+    private final Type type;
+    private final SubSystem subSystem;
+    private final String name;
+    private final String help;
+    private final double[] buckets;
+    private final String[] labelNames;
+
+    MqttMetricsInfo(Type type, SubSystem subSystem, String name, String help, double[] buckets, String... labelNames) {
+        this.type = type;
+        this.subSystem = subSystem;
+        this.name = name;
+        this.help = help;
+        this.buckets = buckets;
+        this.labelNames = labelNames;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getHelp() {
+        return help;
+    }
+
+    public double[] getBuckets() {
+        return buckets;
+    }
+
+    public String[] getLabelNames() {
+        return labelNames;
+    }
+
+    public SubSystem getSubSystem() {
+        return subSystem;
+    }
+
+    @Override
+    public String toString() {
+        return "MqttMetricsInfo{" +
+            "type=" + type +
+            ", subSystem=" + subSystem +
+            ", name='" + name + '\'' +
+            ", help='" + help + '\'' +
+            ", buckets=" + Arrays.toString(buckets) +
+            ", labelNames=" + Arrays.toString(labelNames) +
+            '}';
+    }
+}
\ No newline at end of file
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/SubSystem.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/SubSystem.java
new file mode 100644
index 0000000..73adf64
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/SubSystem.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+public enum SubSystem {
+    DS("ds"),
+    CS("cs");
+
+    private final String value;
+
+    SubSystem(String value) {
+        this.value = value;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        return "SubSystem{" +
+            "value='" + value + '\'' +
+            '}';
+    }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/exception/PrometheusException.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/exception/PrometheusException.java
new file mode 100644
index 0000000..fb9a8b8
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/exception/PrometheusException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.exception;
+
+public class PrometheusException extends Exception {
+
+    public PrometheusException(String message) {
+        super(message);
+    }
+
+    public PrometheusException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public PrometheusException(Throwable cause) {
+        super(cause);
+    }
+
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/BackedFileOutputStream.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/BackedFileOutputStream.java
new file mode 100644
index 0000000..df4506c
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/BackedFileOutputStream.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.http;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import com.google.common.io.ByteSource;
+
+/**
+ * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering
+ * once the data reaches a configurable size.
+ *
+ * <p>Temporary files created by this stream may live in the local filesystem until either:
+ *
+ * <ul>
+ * <li>{@link #reset} is called (removing the data in this stream and deleting the file), or...
+ * <li>this stream (or, more precisely, its {@link #asByteSource} view) is finalized during
+ * garbage collection, <strong>AND</strong> this stream was not constructed with {@linkplain
+ * #BackedFileOutputStream(int) the 1-arg constructor}
+ * </ul>
+ *
+ * <p>This class is thread-safe.
+ */
+public class BackedFileOutputStream extends OutputStream {
+    private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+    private final File parentDirectory;
+    private final int fileThreshold;
+    private final boolean resetOnFinalize;
+    private final ByteSource source;
+
+    private OutputStream out;
+    private MemoryOutput memory;
+    private File file;
+
+    /**
+     * Creates a new instance that uses the given file threshold, and does
+     * not reset the data when the {@link ByteSource} returned by
+     * {@link #asByteSource} is finalized.
+     *
+     * @param fileThreshold the number of bytes before the stream should
+     *                      switch to buffering to a file
+     */
+    public BackedFileOutputStream(int fileThreshold) {
+        this(fileThreshold, false, null);
+    }
+
+    /**
+     * Creates a new instance that uses the given file threshold, and
+     * optionally resets the data when the {@link ByteSource} returned
+     * by {@link #asByteSource} is finalized.
+     *
+     * @param fileThreshold   the number of bytes before the stream should
+     *                        switch to buffering to a file
+     * @param resetOnFinalize if true, the {@link #reset} method will
+     *                        be called when the {@link ByteSource} returned by {@link
+     *                        #asByteSource} is finalized
+     */
+    public BackedFileOutputStream(int fileThreshold, boolean resetOnFinalize, File parentDirectory) {
+        this.parentDirectory = parentDirectory;
+        this.fileThreshold = fileThreshold;
+        this.resetOnFinalize = resetOnFinalize;
+        this.memory = new MemoryOutput();
+        this.out = memory;
+
+        if (resetOnFinalize) {
+            this.source = new ByteSource() {
+                @Override
+                public InputStream openStream() throws IOException {
+                    return openInputStream();
+                }
+
+                @Override
+                protected void finalize() {
+                    try {
+                        reset();
+                    } catch (Throwable t) {
+                        t.printStackTrace(System.err);
+                    }
+                }
+            };
+        } else {
+            this.source = new ByteSource() {
+                @Override
+                public InputStream openStream() throws IOException {
+                    return openInputStream();
+                }
+            };
+        }
+    }
+
+    /**
+     * Returns the file holding the data (possibly null).
+     */
+    public synchronized File getFile() {
+        return file;
+    }
+
+    /**
+     * Returns a readable {@link ByteSource} view of the data that has been
+     * written to this stream.
+     *
+     * @since 15.0
+     */
+    public ByteSource asByteSource() {
+        return source;
+    }
+
+    private synchronized InputStream openInputStream() throws IOException {
+        if (file != null) {
+            return new FileInputStream(file);
+        } else {
+            return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
+        }
+    }
+
+    /**
+     * Calls {@link #close} if not already closed, and then resets this
+     * object back to its initial state, for reuse. If data was buffered
+     * to a file, it will be deleted.
+     *
+     * @throws IOException if an I/O error occurred while deleting the file buffer
+     */
+    public synchronized void reset() throws IOException {
+        try {
+            close();
+        } finally {
+            if (memory == null) {
+                memory = new MemoryOutput();
+            } else {
+                memory.reset();
+            }
+            out = memory;
+            if (file != null) {
+                File deleteMe = file;
+                file = null;
+                if (!deleteMe.delete()) {
+                    throw new IOException("Could not delete: " + deleteMe);
+                }
+            }
+        }
+    }
+
+    @Override
+    public synchronized void write(int b) throws IOException {
+        update(1);
+        out.write(b);
+    }
+
+    @Override
+    public synchronized void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public synchronized void write(byte[] b, int off, int len)
+        throws IOException {
+        update(len);
+        out.write(b, off, len);
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        out.close();
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        out.flush();
+    }
+
+    /**
+     * Checks if writing {@code len} bytes would go over threshold, and
+     * switches to file buffering if so.
+     */
+    private void update(int len) throws IOException {
+        if (memory != null && (memory.getCount() + len > fileThreshold)) {
+            File temp = File.createTempFile("FileBackedOutputStream", null, parentDirectory);
+            if (resetOnFinalize) {
+                // Finalizers are not guaranteed to be called on system shutdown;
+                // this is insurance.
+                temp.deleteOnExit();
+            }
+            try {
+                FileOutputStream transfer = new FileOutputStream(temp);
+                transfer.write(memory.getBuffer(), 0, memory.getCount());
+                transfer.flush();
+                // We've successfully transferred the data; switch to writing to file
+                out = transfer;
+            } catch (IOException e) {
+                temp.delete();
+                throw e;
+            }
+
+            file = temp;
+            memory = null;
+        }
+    }
+
+    public long size() {
+        if (file != null) {
+            return file.length();
+        } else {
+            return memory.getCount();
+        }
+    }
+
+    public synchronized void writeTo(OutputStream out) throws IOException {
+        if (file != null) {
+            FileInputStream fileInputStream = null;
+            try {
+                fileInputStream = new FileInputStream(file);
+                if (transferTo(fileInputStream, out) != file.length()) {
+                    throw new IOException("Bug in BackedFileOutputStream");
+                }
+            } finally {
+                if (fileInputStream != null) {
+                    fileInputStream.close();
+                }
+            }
+        } else {
+            out.write(memory.getBuffer(), 0, memory.getCount());
+        }
+    }
+
+    private long transferTo(FileInputStream fileInputStream, OutputStream out) throws IOException {
+        Objects.requireNonNull(out, "out");
+        long transferred = 0;
+        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+        int read;
+        while ((read = fileInputStream.read(buffer, 0, DEFAULT_BUFFER_SIZE)) >= 0) {
+            out.write(buffer, 0, read);
+            transferred += read;
+        }
+        return transferred;
+    }
+
+    /**
+     * ByteArrayOutputStream that exposes its internals.
+     */
+    private static class MemoryOutput extends ByteArrayOutputStream {
+        byte[] getBuffer() {
+            return buf;
+        }
+
+        int getCount() {
+            return count;
+        }
+    }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/MqttHTTPServer.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/MqttHTTPServer.java
new file mode 100644
index 0000000..cc97527
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/MqttHTTPServer.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.http;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.GZIPOutputStream;
+
+import com.sun.net.httpserver.Authenticator;
+import com.sun.net.httpserver.HttpContext;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Predicate;
+import io.prometheus.client.SampleNameFilter;
+import io.prometheus.client.Supplier;
+import io.prometheus.client.exporter.SampleNameFilterSupplier;
+import io.prometheus.client.exporter.common.TextFormat;
+
+/**
+ * Expose Prometheus metrics using a plain Java HttpServer.
+ * <p>
+ * Example Usage:
+ * <pre>
+ * {@code
+ * MqttHTTPServer server = new MqttHTTPServer(1234);
+ * }
+ * </pre>
+ */
+public class MqttHTTPServer implements Closeable {
+    private static final File TEMP_DIR;
+
+    static {
+        String temDirName = System.getProperty("--logging.path", "/home/admin/logs/mqtt-exporter/");
+        TEMP_DIR = new File(temDirName);
+    }
+
+    static {
+        if (!System.getProperties().containsKey("sun.net.httpserver.maxReqTime")) {
+            System.setProperty("sun.net.httpserver.maxReqTime", "60");
+        }
+
+        if (!System.getProperties().containsKey("sun.net.httpserver.maxRspTime")) {
+            System.setProperty("sun.net.httpserver.maxRspTime", "600");
+        }
+    }
+
+    protected final HttpServer server;
+    protected final ExecutorService executorService;
+
+    /**
+     * Start an HTTP server serving Prometheus metrics from the given registry using the given {@link HttpServer}.
+     * The {@code httpServer} is expected to already be bound to an address
+     */
+    public MqttHTTPServer(HttpServer httpServer, CollectorRegistry registry, boolean daemon) throws IOException {
+        this(httpServer, registry, daemon, null, null);
+    }
+
+    /**
+     * Start an HTTP server serving Prometheus metrics from the given registry.
+     */
+    public MqttHTTPServer(InetSocketAddress addr, CollectorRegistry registry, boolean daemon) throws IOException {
+        this(HttpServer.create(addr, 3), registry, daemon);
+    }
+
+    /**
+     * Start an HTTP server serving Prometheus metrics from the given registry using non-daemon threads.
+     */
+    public MqttHTTPServer(InetSocketAddress addr, CollectorRegistry registry) throws IOException {
+        this(addr, registry, false);
+    }
+
+    /**
+     * Start an HTTP server serving the default Prometheus registry.
+     */
+    public MqttHTTPServer(int port, boolean daemon) throws IOException {
+        this(new InetSocketAddress(port), CollectorRegistry.defaultRegistry, daemon);
+    }
+
+    /**
+     * Start an HTTP server serving the default Prometheus registry using non-daemon threads.
+     */
+    public MqttHTTPServer(int port) throws IOException {
+        this(port, false);
+    }
+
+    /**
+     * Start an HTTP server serving the default Prometheus registry.
+     */
+    public MqttHTTPServer(String host, int port, boolean daemon) throws IOException {
+        this(new InetSocketAddress(host, port), CollectorRegistry.defaultRegistry, daemon);
+    }
+
+    /**
+     * Start an HTTP server serving the default Prometheus registry using non-daemon threads.
+     */
+    public MqttHTTPServer(String host, int port) throws IOException {
+        this(new InetSocketAddress(host, port), CollectorRegistry.defaultRegistry, false);
+    }
+
+    private MqttHTTPServer(HttpServer httpServer, CollectorRegistry registry, boolean daemon,
+        Supplier<Predicate<String>> sampleNameFilterSupplier, Authenticator authenticator) {
+        if (httpServer.getAddress() == null) { throw new IllegalArgumentException("HttpServer hasn't been bound to an address"); }
+
+        server = httpServer;
+        HttpHandler mHandler = new HTTPMetricHandler(registry, sampleNameFilterSupplier);
+        HttpContext mContext = server.createContext("/", mHandler);
+        if (authenticator != null) {
+            mContext.setAuthenticator(authenticator);
+        }
+        mContext = server.createContext("/metrics", mHandler);
+        if (authenticator != null) {
+            mContext.setAuthenticator(authenticator);
+        }
+        mContext = server.createContext("/-/healthy", mHandler);
+        if (authenticator != null) {
+            mContext.setAuthenticator(authenticator);
+        }
+        executorService = new ThreadPoolExecutor(
+            2, 2, 60, TimeUnit.SECONDS,
+            new ArrayBlockingQueue(100), NamedDaemonThreadFactory.defaultThreadFactory(daemon));
+        server.setExecutor(executorService);
+        start(daemon);
+    }
+
+    protected static boolean shouldUseCompression(HttpExchange exchange) {
+        List<String> encodingHeaders = exchange.getRequestHeaders().get("Accept-Encoding");
+        if (encodingHeaders == null) { return false; }
+
+        for (String encodingHeader : encodingHeaders) {
+            String[] encodings = encodingHeader.split(",");
+            for (String encoding : encodings) {
+                if (encoding.trim().equalsIgnoreCase("gzip")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    protected static Set<String> parseQuery(String query) throws IOException {
+        Set<String> names = new HashSet<String>();
+        if (query != null) {
+            String[] pairs = query.split("&");
+            for (String pair : pairs) {
+                int idx = pair.indexOf("=");
+                if (idx != -1 && URLDecoder.decode(pair.substring(0, idx), "UTF-8").equals("name[]")) {
+                    names.add(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
+                }
+            }
+        }
+        return names;
+    }
+
+    /**
+     * Start an HTTP server by making sure that its background thread inherit proper daemon flag.
+     */
+    private void start(boolean daemon) {
+        if (daemon == Thread.currentThread().isDaemon()) {
+            server.start();
+        } else {
+            FutureTask<Void> startTask = new FutureTask<Void>(new Runnable() {
+                @Override
+                public void run() {
+                    server.start();
+                }
+            }, null);
+            NamedDaemonThreadFactory.defaultThreadFactory(daemon).newThread(startTask).start();
+            try {
+                startTask.get();
+            } catch (ExecutionException e) {
+                throw new RuntimeException("Unexpected exception on starting HTTPSever", e);
+            } catch (InterruptedException e) {
+                // This is possible only if the current tread has been interrupted,
+                // but in real use cases this should not happen.
+                // In any case, there is nothing to do, except to propagate interrupted flag.
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Stop the HTTPServer.
+     */
+    @Override
+    public void close() {
+        server.stop(0);
+        executorService.shutdown(); // Free any (parked/idle) threads in pool
+    }
+
+    /**
+     * Gets the port number.
+     */
+    public int getPort() {
+        return server.getAddress().getPort();
+    }
+
+    private static class LocalByteArray extends ThreadLocal<BackedFileOutputStream> {
+        @Override
+        protected BackedFileOutputStream initialValue() {
+            return new BackedFileOutputStream((1 << 20) * 16, false, TEMP_DIR);
+        }
+    }
+
+    /**
+     * Handles Metrics collections from the given registry.
+     */
+    public static class HTTPMetricHandler implements HttpHandler {
+        private final static String HEALTHY_RESPONSE = "Exporter is Healthy.";
+        private final CollectorRegistry registry;
+        private final LocalByteArray response = new LocalByteArray();
+        private final Supplier<Predicate<String>> sampleNameFilterSupplier;
+
+        public HTTPMetricHandler(CollectorRegistry registry) {
+            this(registry, null);
+        }
+
+        public HTTPMetricHandler(CollectorRegistry registry, Supplier<Predicate<String>> sampleNameFilterSupplier) {
+            this.registry = registry;
+            this.sampleNameFilterSupplier = sampleNameFilterSupplier;
+        }
+
+        @Override
+        public void handle(HttpExchange t) throws IOException {
+            String query = t.getRequestURI().getRawQuery();
+            String contextPath = t.getHttpContext().getPath();
+            BackedFileOutputStream response = this.response.get();
+            response.reset();
+            OutputStreamWriter osw = new OutputStreamWriter(response, Charset.forName("UTF-8"));
+            if ("/-/healthy".equals(contextPath)) {
+                osw.write(HEALTHY_RESPONSE);
+            } else {
+                String contentType = TextFormat.chooseContentType(t.getRequestHeaders().getFirst("Accept"));
+                t.getResponseHeaders().set("Content-Type", contentType);
+                Predicate<String> filter = sampleNameFilterSupplier == null ? null : sampleNameFilterSupplier.get();
+                filter = SampleNameFilter.restrictToNamesEqualTo(filter, parseQuery(query));
+                if (filter == null) {
+                    TextFormat.writeFormat(contentType, osw, registry.metricFamilySamples());
+                } else {
+                    TextFormat.writeFormat(contentType, osw, registry.filteredMetricFamilySamples(filter));
+                }
+            }
+
+            osw.close();
+
+            if (shouldUseCompression(t)) {
+                t.getResponseHeaders().set("Content-Encoding", "gzip");
+                t.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0);
+                final GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody());
+                try {
+                    response.writeTo(os);
+                } finally {
+                    os.close();
+                }
+            } else {
+                long contentLength = response.size();
+                if (contentLength > 0) {
+                    t.getResponseHeaders().set("Content-Length", String.valueOf(contentLength));
+                }
+                if (t.getRequestMethod().equals("HEAD")) {
+                    contentLength = -1;
+                }
+                t.sendResponseHeaders(HttpURLConnection.HTTP_OK, contentLength);
+                response.writeTo(t.getResponseBody());
+            }
+            t.close();
+            response.reset();
+        }
+    }
+
+    static class NamedDaemonThreadFactory implements ThreadFactory {
+        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
+
+        private final int poolNumber = POOL_NUMBER.getAndIncrement();
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final ThreadFactory delegate;
+        private final boolean daemon;
+
+        NamedDaemonThreadFactory(ThreadFactory delegate, boolean daemon) {
+            this.delegate = delegate;
+            this.daemon = daemon;
+        }
+
+        static ThreadFactory defaultThreadFactory(boolean daemon) {
+            return new NamedDaemonThreadFactory(Executors.defaultThreadFactory(), daemon);
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = delegate.newThread(r);
+            t.setName(String.format("prometheus-http-%d-%d", poolNumber, threadNumber.getAndIncrement()));
+            t.setDaemon(daemon);
+            return t;
+        }
+    }
+
+    /**
+     * We keep the original constructors of {@link MqttHTTPServer} for compatibility, but new configuration
+     * parameters like {@code sampleNameFilter} must be configured using the Builder.
+     */
+    public static class Builder {
+
+        private int port = 0;
+        private String hostname = null;
+        private InetAddress inetAddress = null;
+        private InetSocketAddress inetSocketAddress = null;
+        private HttpServer httpServer = null;
+        private CollectorRegistry registry = CollectorRegistry.defaultRegistry;
+        private boolean daemon = false;
+        private Predicate<String> sampleNameFilter;
+        private Supplier<Predicate<String>> sampleNameFilterSupplier;
+        private Authenticator authenticator;
+        private HttpsConfigurator httpsConfigurator;
+
+        /**
+         * Port to bind to. Must not be called together with {@link #withInetSocketAddress(InetSocketAddress)}
+         * or {@link #withHttpServer(HttpServer)}. Default is 0, indicating that a random port will be selected.
+         */
+        public Builder withPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        /**
+         * Use this hostname to resolve the IP address to bind to. Must not be called together with
+         * {@link #withInetAddress(InetAddress)} or {@link #withInetSocketAddress(InetSocketAddress)}
+         * or {@link #withHttpServer(HttpServer)}.
+         * Default is empty, indicating that the HTTPServer binds to the wildcard address.
+         */
+        public Builder withHostname(String hostname) {
+            this.hostname = hostname;
+            return this;
+        }
+
+        /**
+         * Bind to this IP address. Must not be called together with {@link #withHostname(String)} or
+         * {@link #withInetSocketAddress(InetSocketAddress)} or {@link #withHttpServer(HttpServer)}.
+         * Default is empty, indicating that the HTTPServer binds to the wildcard address.
+         */
+        public Builder withInetAddress(InetAddress address) {
+            this.inetAddress = address;
+            return this;
+        }
+
+        /**
+         * Listen on this address. Must not be called together with {@link #withPort(int)},
+         * {@link #withHostname(String)}, {@link #withInetAddress(InetAddress)}, or {@link #withHttpServer(HttpServer)}.
+         */
+        public Builder withInetSocketAddress(InetSocketAddress address) {
+            this.inetSocketAddress = address;
+            return this;
+        }
+
+        /**
+         * Use this httpServer. The {@code httpServer} is expected to already be bound to an address.
+         * Must not be called together with {@link #withPort(int)}, or {@link #withHostname(String)},
+         * or {@link #withInetAddress(InetAddress)}, or {@link #withInetSocketAddress(InetSocketAddress)}.
+         */
+        public Builder withHttpServer(HttpServer httpServer) {
+            this.httpServer = httpServer;
+            return this;
+        }
+
+        /**
+         * By default, the {@link MqttHTTPServer} uses non-daemon threads. Set this to {@code true} to
+         * run the {@link MqttHTTPServer} with daemon threads.
+         */
+        public Builder withDaemonThreads(boolean daemon) {
+            this.daemon = daemon;
+            return this;
+        }
+
+        /**
+         * Optional: Only export time series where {@code sampleNameFilter.test(name)} returns true.
+         * <p>
+         * Use this if the sampleNameFilter remains the same throughout the lifetime of the HTTPServer.
+         * If the sampleNameFilter changes during runtime, use {@link #withSampleNameFilterSupplier(Supplier)}.
+         */
+        public Builder withSampleNameFilter(Predicate<String> sampleNameFilter) {
+            this.sampleNameFilter = sampleNameFilter;
+            return this;
+        }
+
+        /**
+         * Optional: Only export time series where {@code sampleNameFilter.test(name)} returns true.
+         * <p>
+         * Use this if the sampleNameFilter may change during runtime, like for example if you have a
+         * hot reload mechanism for your filter config.
+         * If the sampleNameFilter remains the same throughout the lifetime of the HTTPServer,
+         * use {@link #withSampleNameFilter(Predicate)} instead.
+         */
+        public Builder withSampleNameFilterSupplier(Supplier<Predicate<String>> sampleNameFilterSupplier) {
+            this.sampleNameFilterSupplier = sampleNameFilterSupplier;
+            return this;
+        }
+
+        /**
+         * Optional: Default is {@link CollectorRegistry#defaultRegistry}.
+         */
+        public Builder withRegistry(CollectorRegistry registry) {
+            this.registry = registry;
+            return this;
+        }
+
+        /**
+         * Optional: {@link Authenticator} to use to support authentication.
+         */
+        public Builder withAuthenticator(Authenticator authenticator) {
+            this.authenticator = authenticator;
+            return this;
+        }
+
+        /**
+         * Optional: {@link HttpsConfigurator} to use to support TLS/SSL
+         */
+        public Builder withHttpsConfigurator(HttpsConfigurator configurator) {
+            this.httpsConfigurator = configurator;
+            return this;
+        }
+
+        /**
+         * Build the HTTPServer
+         *
+         * @throws IOException
+         */
+        public MqttHTTPServer build() throws IOException {
+            if (sampleNameFilter != null) {
+                assertNull(sampleNameFilterSupplier, "cannot configure 'sampleNameFilter' and 'sampleNameFilterSupplier' at the same time");
+                sampleNameFilterSupplier = SampleNameFilterSupplier.of(sampleNameFilter);
+            }
+
+            if (httpServer != null) {
+                assertZero(port, "cannot configure 'httpServer' and 'port' at the same time");
+                assertNull(hostname, "cannot configure 'httpServer' and 'hostname' at the same time");
+                assertNull(inetAddress, "cannot configure 'httpServer' and 'inetAddress' at the same time");
+                assertNull(inetSocketAddress, "cannot configure 'httpServer' and 'inetSocketAddress' at the same time");
+                assertNull(httpsConfigurator, "cannot configure 'httpServer' and 'httpsConfigurator' at the same time");
+                return new MqttHTTPServer(httpServer, registry, daemon, sampleNameFilterSupplier, authenticator);
+            } else if (inetSocketAddress != null) {
+                assertZero(port, "cannot configure 'inetSocketAddress' and 'port' at the same time");
+                assertNull(hostname, "cannot configure 'inetSocketAddress' and 'hostname' at the same time");
+                assertNull(inetAddress, "cannot configure 'inetSocketAddress' and 'inetAddress' at the same time");
+            } else if (inetAddress != null) {
+                assertNull(hostname, "cannot configure 'inetAddress' and 'hostname' at the same time");
+                inetSocketAddress = new InetSocketAddress(inetAddress, port);
+            } else if (hostname != null) {
+                inetSocketAddress = new InetSocketAddress(hostname, port);
+            } else {
+                inetSocketAddress = new InetSocketAddress(port);
+            }
+
+            HttpServer httpServer = null;
+            if (httpsConfigurator != null) {
+                httpServer = HttpsServer.create(inetSocketAddress, 3);
+                ((HttpsServer)httpServer).setHttpsConfigurator(httpsConfigurator);
+            } else {
+                httpServer = HttpServer.create(inetSocketAddress, 3);
+            }
+
+            return new MqttHTTPServer(httpServer, registry, daemon, sampleNameFilterSupplier, authenticator);
+        }
+
+        private void assertNull(Object o, String msg) {
+            if (o != null) {
+                throw new IllegalStateException(msg);
+            }
+        }
+
+        private void assertZero(int i, String msg) {
+            if (i != 0) {
+                throw new IllegalStateException(msg);
+            }
+        }
+    }
+}
+
diff --git a/pom.xml b/pom.xml
index 87c55cb..0e665ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,6 +12,7 @@
         <module>mqtt-common</module>
         <module>mqtt-cs</module>
         <module>mqtt-ds</module>
+        <module>mqtt-exporter</module>
         <module>mqtt-example</module>
     </modules>
 
@@ -23,6 +24,7 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <spring.version>4.3.16.RELEASE</spring.version>
         <rocket.version>4.9.3</rocket.version>
+        <prometheus.version>0.12.0</prometheus.version>
     </properties>
 
     <dependencyManagement>
@@ -34,6 +36,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
+                <artifactId>mqtt-exporter</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
                 <artifactId>mqtt-ds</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -112,6 +119,27 @@
                 <artifactId>commons-codec</artifactId>
                 <version>1.15</version>
             </dependency>
+            <!--   for prometheus -->
+            <dependency>
+                <groupId>io.prometheus</groupId>
+                <artifactId>simpleclient</artifactId>
+                <version>${prometheus.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.prometheus</groupId>
+                <artifactId>simpleclient_hotspot</artifactId>
+                <version>${prometheus.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.prometheus</groupId>
+                <artifactId>simpleclient_httpserver</artifactId>
+                <version>${prometheus.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>19.0</version>
+            </dependency>
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>