You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/04/01 02:11:09 UTC
[rocketmq-mqtt] branch main updated: [ISSUE #38][part 1] just add prometheus frame support (#39)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/main by this push:
new ab353d0 [ISSUE #38][part 1] just add prometheus frame support (#39)
ab353d0 is described below
commit ab353d08a959796090c8b0d33339fb5c42713b75
Author: tianliuliu <64...@qq.com>
AuthorDate: Fri Apr 1 10:11:04 2022 +0800
[ISSUE #38][part 1] just add prometheus frame support (#39)
* fix #38 support prometheus exporter
* fix #38 support prometheus exporter 1
* fix #38 support prometheus exporter frame
* fix #38 support prometheus collect pull status metrics
---
mqtt-cs/pom.xml | 4 +
.../rocketmq/mqtt/cs/config/ConnectConf.java | 18 +
.../rocketmq/mqtt/cs/session/loop/QueueCache.java | 18 +
.../rocketmq/mqtt/cs/starter/ExporterServer.java | 53 +++
mqtt-ds/pom.xml | 4 +
.../mqtt/ds/store/LmqQueueStoreManager.java | 6 +
mqtt-exporter/pom.xml | 36 ++
.../rocketmq/mqtt/exporter/MqttExporter.java | 51 ++
.../exporter/collector/MetricsBuilderFactory.java | 65 +++
.../exporter/collector/MqttMetricsCollector.java | 173 +++++++
.../mqtt/exporter/collector/MqttMetricsInfo.java | 90 ++++
.../mqtt/exporter/collector/SubSystem.java | 40 ++
.../exporter/exception/PrometheusException.java | 34 ++
.../mqtt/exporter/http/BackedFileOutputStream.java | 273 +++++++++++
.../mqtt/exporter/http/MqttHTTPServer.java | 513 +++++++++++++++++++++
pom.xml | 28 ++
16 files changed, 1406 insertions(+)
diff --git a/mqtt-cs/pom.xml b/mqtt-cs/pom.xml
index 582771e..d72a4f0 100644
--- a/mqtt-cs/pom.xml
+++ b/mqtt-cs/pom.xml
@@ -18,6 +18,10 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
+ <artifactId>mqtt-exporter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
index 955e865..df28bf8 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
@@ -45,6 +45,8 @@ public class ConnectConf {
private int pullBatchSize = 32;
private int rpcListenPort = 7001;
private int retryIntervalSeconds = 3;
+ private int exporterPort = 9090;
+ private boolean enablePrometheus = false;
public ConnectConf() throws IOException {
ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME);
@@ -179,4 +181,20 @@ public class ConnectConf {
public void setRetryIntervalSeconds(int retryIntervalSeconds) {
this.retryIntervalSeconds = retryIntervalSeconds;
}
+
+ public int getExporterPort() {
+ return exporterPort;
+ }
+
+ public void setExporterPort(int exporterPort) {
+ this.exporterPort = exporterPort;
+ }
+
+ public boolean isEnablePrometheus() {
+ return enablePrometheus;
+ }
+
+ public void setEnablePrometheus(boolean enablePrometheus) {
+ this.enablePrometheus = enablePrometheus;
+ }
}
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
index d009055..89c6a71 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/QueueCache.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.common.util.StatUtil;
import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
import org.apache.rocketmq.mqtt.cs.session.Session;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -182,6 +183,7 @@ public class QueueCache {
CompletableFuture<PullResult> callBackResult) {
if (subscription.isP2p() || subscription.isRetry()) {
StatUtil.addPv("NotPullCache", 1);
+ collectorPullCacheStatus("NotPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -189,6 +191,7 @@ public class QueueCache {
CacheEntry cacheEntry = cache.getIfPresent(queue);
if (cacheEntry == null) {
StatUtil.addPv("NoPullCache", 1);
+ collectorPullCacheStatus("NotPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -196,6 +199,7 @@ public class QueueCache {
if (cacheEntry.loading.get()) {
if (System.currentTimeMillis() - cacheEntry.startLoadingT > 1000) {
StatUtil.addPv("LoadPullCacheTimeout", 1);
+ collectorPullCacheStatus("LoadPullCacheTimeout");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -206,10 +210,12 @@ public class QueueCache {
List<Message> cacheMsgList = cacheEntry.messageList;
if (cacheMsgList.isEmpty()) {
if (loadEvent.get(queue) != null) {
+ collectorPullCacheStatus("EmptyPullCacheLATER");
StatUtil.addPv("EmptyPullCacheLATER", 1);
return LATER;
}
StatUtil.addPv("EmptyPullCache", 1);
+ collectorPullCacheStatus("EmptyPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -217,6 +223,7 @@ public class QueueCache {
if (queueOffset.getOffset() < cacheMsgList.get(0).getOffset()) {
StatUtil.addPv("OutPullCache", 1);
+ collectorPullCacheStatus("OutPullCache");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -235,9 +242,11 @@ public class QueueCache {
if (resultMsgs.isEmpty()) {
if (loadEvent.get(queue) != null) {
StatUtil.addPv("PullCacheLATER", 1);
+ collectorPullCacheStatus("PullCacheLATER");
return LATER;
}
StatUtil.addPv("OutPullCache2", 1);
+ collectorPullCacheStatus("OutPullCache2");
CompletableFuture<PullResult> pullResult = lmqQueueStore.pullMessage(toFirstTopic(subscription), queue, queueOffset, count);
callbackResult(pullResult, callBackResult);
return DONE;
@@ -246,12 +255,21 @@ public class QueueCache {
pullResult.setMessageList(resultMsgs);
callBackResult.complete(pullResult);
StatUtil.addPv("PullFromCache", 1);
+ collectorPullCacheStatus("PullFromCache");
if (loadEvent.get(queue) != null) {
return LATER;
}
return DONE;
}
+ private void collectorPullCacheStatus(String pullCacheStatus) {
+ try {
+ MqttMetricsCollector.collectPullCacheStatusTps(1, pullCacheStatus);
+ } catch (Throwable e) {
+ logger.error("", e);
+ }
+ }
+
private void loadCache(boolean isFirst, String firstTopic, Queue queue, QueueOffset queueOffset, int count,
QueueEvent event) {
loadStatus.put(queue, true);
diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/ExporterServer.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/ExporterServer.java
new file mode 100644
index 0000000..e5749d8
--- /dev/null
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/ExporterServer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.cs.starter;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+import org.apache.rocketmq.mqtt.common.util.HostInfo;
+import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
+import org.apache.rocketmq.mqtt.exporter.MqttExporter;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ExporterServer {
+ private static final HostInfo HOST_INFO = HostInfo.getInstall();
+ private static final String NAMESPACE = "mqtt";
+ @Resource
+ private ConnectConf connectConf;
+
+ private MqttExporter mqttExporter;
+
+ @PostConstruct
+ public void init() throws Exception {
+ if (connectConf.isEnablePrometheus()) {
+ this.mqttExporter = new MqttExporter(NAMESPACE, HOST_INFO.getName(), HOST_INFO.getAddress(),
+ connectConf.getExporterPort());
+ mqttExporter.start();
+ }
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ if (this.mqttExporter != null) {
+ this.mqttExporter.shutdown();
+ }
+ }
+}
diff --git a/mqtt-ds/pom.xml b/mqtt-ds/pom.xml
index 7aac121..768c508 100644
--- a/mqtt-ds/pom.xml
+++ b/mqtt-ds/pom.xml
@@ -17,6 +17,10 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
+ <artifactId>mqtt-exporter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index f521c2b..f4f1c63 100644
--- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@ -55,6 +55,7 @@ import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -218,6 +219,11 @@ public class LmqQueueStoreManager implements LmqQueueStore {
result.complete(toLmqPullResult(queue, pullResult));
StatUtil.addInvoke("lmqPull", System.currentTimeMillis() - start);
StatUtil.addPv(pullResult.getPullStatus().name(), 1);
+ try {
+ MqttMetricsCollector.collectPullStatusTps(1, pullResult.getPullStatus().name());
+ } catch (Throwable e) {
+ logger.error("collect prometheus error", e);
+ }
}
@Override
diff --git a/mqtt-exporter/pom.xml b/mqtt-exporter/pom.xml
new file mode 100644
index 0000000..07fd0eb
--- /dev/null
+++ b/mqtt-exporter/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-mqtt</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mqtt-exporter</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_hotspot</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_httpserver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/MqttExporter.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/MqttExporter.java
new file mode 100644
index 0000000..ebc6a36
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/MqttExporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter;
+
+import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttExporter {
+ protected static final Logger LOG = LoggerFactory.getLogger(MqttExporter.class);
+
+ private final String nameSpace;
+ private final String hostName;
+ private final String hostIp;
+ private final int exporterPort;
+
+ public MqttExporter(String nameSpace, String hostName, String hostIp, int exporterPort) {
+ this.nameSpace = nameSpace;
+ this.hostName = hostName;
+ this.hostIp = hostIp;
+ this.exporterPort = exporterPort;
+ }
+
+ public void start() throws Exception {
+ // todo if start jvm exporter default
+ DefaultExports.initialize();
+ MqttMetricsCollector.initialize(this.nameSpace, this.hostName, this.hostIp, this.exporterPort);
+ LOG.info("metrics exporter start success");
+
+ }
+
+ public void shutdown() {
+ MqttMetricsCollector.shutdown();
+ }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MetricsBuilderFactory.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MetricsBuilderFactory.java
new file mode 100644
index 0000000..476226d
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MetricsBuilderFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import io.prometheus.client.SimpleCollector.Builder;
+
+public class MetricsBuilderFactory {
+
+ private static Builder newGaugeBuilder(MqttMetricsInfo mqttMetricsInfo) {
+ return Gauge.build().
+ name(mqttMetricsInfo.getName()).
+ help(mqttMetricsInfo.getHelp()).
+ labelNames(mqttMetricsInfo.getLabelNames());
+
+ }
+
+ private static Builder newCounterBuilder(MqttMetricsInfo mqttMetricsInfo) {
+ return Counter.build().
+ name(mqttMetricsInfo.getName()).
+ help(mqttMetricsInfo.getHelp()).
+ labelNames(mqttMetricsInfo.getLabelNames());
+ }
+
+ private static Builder newHistogramBuilder(MqttMetricsInfo mqttMetricsInfo) {
+ return Histogram.build().
+ name(mqttMetricsInfo.getName()).
+ help(mqttMetricsInfo.getHelp()).
+ labelNames(mqttMetricsInfo.getLabelNames()).
+ buckets(mqttMetricsInfo.getBuckets());
+ }
+
+ public static Builder newCollectorBuilder(MqttMetricsInfo mqttMetricsInfo) {
+ switch (mqttMetricsInfo.getType()) {
+ case COUNTER:
+ return newCounterBuilder(mqttMetricsInfo);
+ case GAUGE:
+ return newGaugeBuilder(mqttMetricsInfo);
+ case HISTOGRAM:
+ return newHistogramBuilder(mqttMetricsInfo);
+ default:
+ break;
+
+ }
+ return null;
+ }
+
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
new file mode 100644
index 0000000..820f71b
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsCollector.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.Type;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import io.prometheus.client.SimpleCollector.Builder;
+import org.apache.rocketmq.mqtt.exporter.exception.PrometheusException;
+import org.apache.rocketmq.mqtt.exporter.http.MqttHTTPServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MqttMetricsCollector {
+ public static final CollectorRegistry COLLECTOR_REGISTRY = CollectorRegistry.defaultRegistry;
+ private static final Logger LOG = LoggerFactory.getLogger(MqttMetricsCollector.class);
+ private static final Map<Type, Map<MqttMetricsInfo, Collector>> ALL_TYPE_COLLECTORS = new ConcurrentHashMap<>();
+ private static volatile boolean initialized = false;
+ private static String namespace;
+ private static String hostName;
+ private static String hostIp;
+ private static MqttHTTPServer httpServer;
+
+ public static synchronized void initialize(String nameSpace, String hostName, String hostIp, int exporterPort) throws IOException {
+ if (!initialized) {
+ MqttMetricsCollector.namespace = nameSpace;
+ MqttMetricsCollector.hostName = hostName;
+ MqttMetricsCollector.hostIp = hostIp;
+ register(MqttMetricsInfo.values());
+ httpServer = new MqttHTTPServer(new InetSocketAddress(exporterPort), MqttMetricsCollector.COLLECTOR_REGISTRY, false);
+ initialized = true;
+ }
+ }
+
+ private static void register(MqttMetricsInfo[] mqttMetricsInfos) {
+ for (MqttMetricsInfo metricsInfo : mqttMetricsInfos) {
+ register(metricsInfo);
+ }
+ }
+
+ public static void register(MqttMetricsInfo metricsInfo) {
+ Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(metricsInfo.getType());
+ if (mqttMetricsInfoCollectorTypeMap == null) {
+ mqttMetricsInfoCollectorTypeMap = new ConcurrentHashMap<>();
+ Map<MqttMetricsInfo, Collector> oldMap = ALL_TYPE_COLLECTORS.putIfAbsent(metricsInfo.getType(),
+ mqttMetricsInfoCollectorTypeMap);
+
+ if (oldMap != null) {
+ mqttMetricsInfoCollectorTypeMap = oldMap;
+ }
+ }
+ if (mqttMetricsInfoCollectorTypeMap.get(metricsInfo) == null) {
+ Builder builder = MetricsBuilderFactory.newCollectorBuilder(metricsInfo);
+ if (builder == null) {
+ return;
+ }
+ mqttMetricsInfoCollectorTypeMap.putIfAbsent(metricsInfo, buildNameSpace(builder, metricsInfo).register(COLLECTOR_REGISTRY));
+ }
+ }
+
+ public static void unRegister(MqttMetricsInfo metricsInfo) {
+ Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(metricsInfo.getType());
+ if (mqttMetricsInfoCollectorTypeMap == null) {
+ return;
+ }
+ Collector collector = mqttMetricsInfoCollectorTypeMap.get(metricsInfo);
+ if (collector == null) {
+ return;
+ }
+ COLLECTOR_REGISTRY.unregister(collector);
+ mqttMetricsInfoCollectorTypeMap.remove(metricsInfo, collector);
+
+ }
+
+ private static Builder buildNameSpace(Builder builder, MqttMetricsInfo mqttMetricsInfo) {
+ return builder.namespace(namespace).subsystem(mqttMetricsInfo.getSubSystem().getValue());
+ }
+
+ private static String[] paddingClusterLabelValues(String... labelValues) {
+ String[] newLabelValues = new String[labelValues.length + 2];
+ int index = 0;
+ newLabelValues[index++] = hostName;
+ newLabelValues[index++] = hostIp;
+ for (int i = 0; i < labelValues.length; i++, index++) {
+ newLabelValues[index] = labelValues[i];
+ }
+ return newLabelValues;
+ }
+
+ private static void collect(MqttMetricsInfo mqttMetricsInfo, long val, String... labels) throws PrometheusException {
+ Map<MqttMetricsInfo, Collector> mqttMetricsInfoCollectorTypeMap = ALL_TYPE_COLLECTORS.get(mqttMetricsInfo.getType());
+ if (mqttMetricsInfoCollectorTypeMap == null) {
+ throw new PrometheusException("mqttMetricsInfo unregistered or collector type not support: " + mqttMetricsInfo);
+ }
+ Collector collector = mqttMetricsInfoCollectorTypeMap.get(mqttMetricsInfo);
+ if (collector == null) {
+ throw new PrometheusException("mqttMetricsInfo unregistered or collector type not support: " + mqttMetricsInfo);
+ }
+
+ try {
+ if (mqttMetricsInfo.getType() == Type.COUNTER) {
+ Counter counter = (Counter)collector;
+ counter.labels(paddingClusterLabelValues(labels)).inc(val);
+ } else if (mqttMetricsInfo.getType() == Type.GAUGE) {
+ Gauge gauge = (Gauge)collector;
+ gauge.labels(paddingClusterLabelValues(labels)).set(val);
+ } else if (mqttMetricsInfo.getType() == Type.HISTOGRAM) {
+ Histogram histogram = (Histogram)collector;
+ histogram.labels(paddingClusterLabelValues(labels)).observe(val);
+ }
+ } catch (Exception e) {
+ LOG.error("collect metrics exception.", labels2String(labels), val, e);
+ throw new PrometheusException("collect metrics exception", e);
+ }
+ }
+
+ public static void collectDemoTps(long val, String... labels) throws PrometheusException {
+ collect(MqttMetricsInfo.DEMO_TPS, val, labels);
+ }
+
+ public static void collectDemoGuage(long val, String... labels) throws PrometheusException {
+ collect(MqttMetricsInfo.DEMO_GAUGE, val, labels);
+ }
+
+ public static void collectDemoLatency(long val, String... labels) throws PrometheusException {
+ collect(MqttMetricsInfo.DEMO_LATENCY, val, labels);
+ }
+
+ public static void collectPullStatusTps(long val, String... labels) throws PrometheusException {
+ collect(MqttMetricsInfo.PULL_STATUS_TPS, val, labels);
+ }
+
+ public static void collectPullCacheStatusTps(long val, String... labels) throws PrometheusException {
+ collect(MqttMetricsInfo.PULL_CACHE_STATUS_TPS, val, labels);
+ }
+
+ private static String labels2String(String... labels) {
+ StringBuilder sb = new StringBuilder(128);
+ for (String label : labels) {
+ sb.append(label).append(";");
+ }
+ return sb.toString();
+ }
+
+ public static void shutdown() {
+ if (httpServer != null) {
+ httpServer.close();
+ }
+ }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
new file mode 100644
index 0000000..f3b2e14
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/MqttMetricsInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+import java.util.Arrays;
+
+import io.prometheus.client.Collector.Type;
+
+public enum MqttMetricsInfo {
+ DEMO_TPS(Type.COUNTER, SubSystem.CS, "tps_total", "demo tps.", null,
+ "hostName", "hostIp"),
+ DEMO_GAUGE(Type.GAUGE, SubSystem.DS,"demo_gauge", "demo gauge status.",
+ null,
+ "hostName", "hostIp"),
+ DEMO_LATENCY(Type.HISTOGRAM, SubSystem.DS,"demo_latency", "latency in microsecond.",
+ new double[] {100, 300, 500, 1000, 3000, 5000, 10000, 50000},
+ "hostName", "hostIp"),
+ PULL_STATUS_TPS(Type.COUNTER, SubSystem.DS, "pull_status_tps_total", "ds pull msg status tps.", null,
+ "hostName", "hostIp", "pullStatus"),
+ PULL_CACHE_STATUS_TPS(Type.COUNTER, SubSystem.CS, "pull_cache_status_tps_total", "ds pull cache status tps.", null,
+ "hostName", "hostIp", "pullCacheStatus");
+
+
+ private final Type type;
+ private final SubSystem subSystem;
+ private final String name;
+ private final String help;
+ private final double[] buckets;
+ private final String[] labelNames;
+
+ MqttMetricsInfo(Type type, SubSystem subSystem, String name, String help, double[] buckets, String... labelNames) {
+ this.type = type;
+ this.subSystem = subSystem;
+ this.name = name;
+ this.help = help;
+ this.buckets = buckets;
+ this.labelNames = labelNames;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHelp() {
+ return help;
+ }
+
+ public double[] getBuckets() {
+ return buckets;
+ }
+
+ public String[] getLabelNames() {
+ return labelNames;
+ }
+
+ public SubSystem getSubSystem() {
+ return subSystem;
+ }
+
+ @Override
+ public String toString() {
+ return "MqttMetricsInfo{" +
+ "type=" + type +
+ ", subSystem=" + subSystem +
+ ", name='" + name + '\'' +
+ ", help='" + help + '\'' +
+ ", buckets=" + Arrays.toString(buckets) +
+ ", labelNames=" + Arrays.toString(labelNames) +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/SubSystem.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/SubSystem.java
new file mode 100644
index 0000000..73adf64
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/collector/SubSystem.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.collector;
+
+public enum SubSystem {
+ DS("ds"),
+ CS("cs");
+
+ private final String value;
+
+ SubSystem(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "SubSystem{" +
+ "value='" + value + '\'' +
+ '}';
+ }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/exception/PrometheusException.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/exception/PrometheusException.java
new file mode 100644
index 0000000..fb9a8b8
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/exception/PrometheusException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.exception;
+
+public class PrometheusException extends Exception {
+
+ public PrometheusException(String message) {
+ super(message);
+ }
+
+ public PrometheusException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PrometheusException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/BackedFileOutputStream.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/BackedFileOutputStream.java
new file mode 100644
index 0000000..df4506c
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/BackedFileOutputStream.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.http;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import com.google.common.io.ByteSource;
+
+/**
+ * An {@link OutputStream} that starts buffering to a byte array, but switches to file buffering
+ * once the data reaches a configurable size.
+ *
+ * <p>Temporary files created by this stream may live in the local filesystem until either:
+ *
+ * <ul>
+ * <li>{@link #reset} is called (removing the data in this stream and deleting the file), or...
+ * <li>this stream (or, more precisely, its {@link #asByteSource} view) is finalized during
+ * garbage collection, <strong>AND</strong> this stream was not constructed with {@linkplain
+ * #BackedFileOutputStream(int) the 1-arg constructor}
+ * </ul>
+ *
+ * <p>This class is thread-safe.
+ */
+public class BackedFileOutputStream extends OutputStream {
+ private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+ private final File parentDirectory;
+ private final int fileThreshold;
+ private final boolean resetOnFinalize;
+ private final ByteSource source;
+
+ private OutputStream out;
+ private MemoryOutput memory;
+ private File file;
+
+ /**
+ * Creates a new instance that uses the given file threshold, and does
+ * not reset the data when the {@link ByteSource} returned by
+ * {@link #asByteSource} is finalized.
+ *
+ * @param fileThreshold the number of bytes before the stream should
+ * switch to buffering to a file
+ */
+ public BackedFileOutputStream(int fileThreshold) {
+ this(fileThreshold, false, null);
+ }
+
+ /**
+ * Creates a new instance that uses the given file threshold, and
+ * optionally resets the data when the {@link ByteSource} returned
+ * by {@link #asByteSource} is finalized.
+ *
+ * @param fileThreshold the number of bytes before the stream should
+ * switch to buffering to a file
+ * @param resetOnFinalize if true, the {@link #reset} method will
+ * be called when the {@link ByteSource} returned by {@link
+ * #asByteSource} is finalized
+ */
+ public BackedFileOutputStream(int fileThreshold, boolean resetOnFinalize, File parentDirectory) {
+ this.parentDirectory = parentDirectory;
+ this.fileThreshold = fileThreshold;
+ this.resetOnFinalize = resetOnFinalize;
+ this.memory = new MemoryOutput();
+ this.out = memory;
+
+ if (resetOnFinalize) {
+ this.source = new ByteSource() {
+ @Override
+ public InputStream openStream() throws IOException {
+ return openInputStream();
+ }
+
+ @Override
+ protected void finalize() {
+ try {
+ reset();
+ } catch (Throwable t) {
+ t.printStackTrace(System.err);
+ }
+ }
+ };
+ } else {
+ this.source = new ByteSource() {
+ @Override
+ public InputStream openStream() throws IOException {
+ return openInputStream();
+ }
+ };
+ }
+ }
+
+ /**
+ * Returns the file holding the data (possibly null).
+ */
+ public synchronized File getFile() {
+ return file;
+ }
+
+ /**
+ * Returns a readable {@link ByteSource} view of the data that has been
+ * written to this stream.
+ *
+ * @since 15.0
+ */
+ public ByteSource asByteSource() {
+ return source;
+ }
+
+ private synchronized InputStream openInputStream() throws IOException {
+ if (file != null) {
+ return new FileInputStream(file);
+ } else {
+ return new ByteArrayInputStream(memory.getBuffer(), 0, memory.getCount());
+ }
+ }
+
+ /**
+ * Calls {@link #close} if not already closed, and then resets this
+ * object back to its initial state, for reuse. If data was buffered
+ * to a file, it will be deleted.
+ *
+ * @throws IOException if an I/O error occurred while deleting the file buffer
+ */
+ public synchronized void reset() throws IOException {
+ try {
+ close();
+ } finally {
+ if (memory == null) {
+ memory = new MemoryOutput();
+ } else {
+ memory.reset();
+ }
+ out = memory;
+ if (file != null) {
+ File deleteMe = file;
+ file = null;
+ if (!deleteMe.delete()) {
+ throw new IOException("Could not delete: " + deleteMe);
+ }
+ }
+ }
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ update(1);
+ out.write(b);
+ }
+
+ @Override
+ public synchronized void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public synchronized void write(byte[] b, int off, int len)
+ throws IOException {
+ update(len);
+ out.write(b, off, len);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ out.flush();
+ }
+
+ /**
+ * Checks if writing {@code len} bytes would go over threshold, and
+ * switches to file buffering if so.
+ */
+ private void update(int len) throws IOException {
+ if (memory != null && (memory.getCount() + len > fileThreshold)) {
+ File temp = File.createTempFile("FileBackedOutputStream", null, parentDirectory);
+ if (resetOnFinalize) {
+ // Finalizers are not guaranteed to be called on system shutdown;
+ // this is insurance.
+ temp.deleteOnExit();
+ }
+ try {
+ FileOutputStream transfer = new FileOutputStream(temp);
+ transfer.write(memory.getBuffer(), 0, memory.getCount());
+ transfer.flush();
+ // We've successfully transferred the data; switch to writing to file
+ out = transfer;
+ } catch (IOException e) {
+ temp.delete();
+ throw e;
+ }
+
+ file = temp;
+ memory = null;
+ }
+ }
+
+ public long size() {
+ if (file != null) {
+ return file.length();
+ } else {
+ return memory.getCount();
+ }
+ }
+
+ public synchronized void writeTo(OutputStream out) throws IOException {
+ if (file != null) {
+ FileInputStream fileInputStream = null;
+ try {
+ fileInputStream = new FileInputStream(file);
+ if (transferTo(fileInputStream, out) != file.length()) {
+ throw new IOException("Bug in BackedFileOutputStream");
+ }
+ } finally {
+ if (fileInputStream != null) {
+ fileInputStream.close();
+ }
+ }
+ } else {
+ out.write(memory.getBuffer(), 0, memory.getCount());
+ }
+ }
+
+ private long transferTo(FileInputStream fileInputStream, OutputStream out) throws IOException {
+ Objects.requireNonNull(out, "out");
+ long transferred = 0;
+ byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+ int read;
+ while ((read = fileInputStream.read(buffer, 0, DEFAULT_BUFFER_SIZE)) >= 0) {
+ out.write(buffer, 0, read);
+ transferred += read;
+ }
+ return transferred;
+ }
+
+ /**
+ * ByteArrayOutputStream that exposes its internals.
+ */
+ private static class MemoryOutput extends ByteArrayOutputStream {
+ byte[] getBuffer() {
+ return buf;
+ }
+
+ int getCount() {
+ return count;
+ }
+ }
+}
diff --git a/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/MqttHTTPServer.java b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/MqttHTTPServer.java
new file mode 100644
index 0000000..cc97527
--- /dev/null
+++ b/mqtt-exporter/src/main/java/org/apache/rocketmq/mqtt/exporter/http/MqttHTTPServer.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.exporter.http;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.GZIPOutputStream;
+
+import com.sun.net.httpserver.Authenticator;
+import com.sun.net.httpserver.HttpContext;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Predicate;
+import io.prometheus.client.SampleNameFilter;
+import io.prometheus.client.Supplier;
+import io.prometheus.client.exporter.SampleNameFilterSupplier;
+import io.prometheus.client.exporter.common.TextFormat;
+
+/**
+ * Expose Prometheus metrics using a plain Java HttpServer.
+ * <p>
+ * Example Usage:
+ * <pre>
+ * {@code
+ * MqttHTTPServer server = new MqttHTTPServer(1234);
+ * }
+ * </pre>
+ */
+public class MqttHTTPServer implements Closeable {
+ private static final File TEMP_DIR;
+
+ static {
+ String temDirName = System.getProperty("--logging.path", "/home/admin/logs/mqtt-exporter/");
+ TEMP_DIR = new File(temDirName);
+ }
+
+ static {
+ if (!System.getProperties().containsKey("sun.net.httpserver.maxReqTime")) {
+ System.setProperty("sun.net.httpserver.maxReqTime", "60");
+ }
+
+ if (!System.getProperties().containsKey("sun.net.httpserver.maxRspTime")) {
+ System.setProperty("sun.net.httpserver.maxRspTime", "600");
+ }
+ }
+
+ protected final HttpServer server;
+ protected final ExecutorService executorService;
+
+ /**
+ * Start an HTTP server serving Prometheus metrics from the given registry using the given {@link HttpServer}.
+ * The {@code httpServer} is expected to already be bound to an address
+ */
+ public MqttHTTPServer(HttpServer httpServer, CollectorRegistry registry, boolean daemon) throws IOException {
+ this(httpServer, registry, daemon, null, null);
+ }
+
+ /**
+ * Start an HTTP server serving Prometheus metrics from the given registry.
+ */
+ public MqttHTTPServer(InetSocketAddress addr, CollectorRegistry registry, boolean daemon) throws IOException {
+ this(HttpServer.create(addr, 3), registry, daemon);
+ }
+
+ /**
+ * Start an HTTP server serving Prometheus metrics from the given registry using non-daemon threads.
+ */
+ public MqttHTTPServer(InetSocketAddress addr, CollectorRegistry registry) throws IOException {
+ this(addr, registry, false);
+ }
+
+ /**
+ * Start an HTTP server serving the default Prometheus registry.
+ */
+ public MqttHTTPServer(int port, boolean daemon) throws IOException {
+ this(new InetSocketAddress(port), CollectorRegistry.defaultRegistry, daemon);
+ }
+
+ /**
+ * Start an HTTP server serving the default Prometheus registry using non-daemon threads.
+ */
+ public MqttHTTPServer(int port) throws IOException {
+ this(port, false);
+ }
+
+ /**
+ * Start an HTTP server serving the default Prometheus registry.
+ */
+ public MqttHTTPServer(String host, int port, boolean daemon) throws IOException {
+ this(new InetSocketAddress(host, port), CollectorRegistry.defaultRegistry, daemon);
+ }
+
+ /**
+ * Start an HTTP server serving the default Prometheus registry using non-daemon threads.
+ */
+ public MqttHTTPServer(String host, int port) throws IOException {
+ this(new InetSocketAddress(host, port), CollectorRegistry.defaultRegistry, false);
+ }
+
+ private MqttHTTPServer(HttpServer httpServer, CollectorRegistry registry, boolean daemon,
+ Supplier<Predicate<String>> sampleNameFilterSupplier, Authenticator authenticator) {
+ if (httpServer.getAddress() == null) { throw new IllegalArgumentException("HttpServer hasn't been bound to an address"); }
+
+ server = httpServer;
+ HttpHandler mHandler = new HTTPMetricHandler(registry, sampleNameFilterSupplier);
+ HttpContext mContext = server.createContext("/", mHandler);
+ if (authenticator != null) {
+ mContext.setAuthenticator(authenticator);
+ }
+ mContext = server.createContext("/metrics", mHandler);
+ if (authenticator != null) {
+ mContext.setAuthenticator(authenticator);
+ }
+ mContext = server.createContext("/-/healthy", mHandler);
+ if (authenticator != null) {
+ mContext.setAuthenticator(authenticator);
+ }
+ executorService = new ThreadPoolExecutor(
+ 2, 2, 60, TimeUnit.SECONDS,
+ new ArrayBlockingQueue(100), NamedDaemonThreadFactory.defaultThreadFactory(daemon));
+ server.setExecutor(executorService);
+ start(daemon);
+ }
+
+ protected static boolean shouldUseCompression(HttpExchange exchange) {
+ List<String> encodingHeaders = exchange.getRequestHeaders().get("Accept-Encoding");
+ if (encodingHeaders == null) { return false; }
+
+ for (String encodingHeader : encodingHeaders) {
+ String[] encodings = encodingHeader.split(",");
+ for (String encoding : encodings) {
+ if (encoding.trim().equalsIgnoreCase("gzip")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ protected static Set<String> parseQuery(String query) throws IOException {
+ Set<String> names = new HashSet<String>();
+ if (query != null) {
+ String[] pairs = query.split("&");
+ for (String pair : pairs) {
+ int idx = pair.indexOf("=");
+ if (idx != -1 && URLDecoder.decode(pair.substring(0, idx), "UTF-8").equals("name[]")) {
+ names.add(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
+ }
+ }
+ }
+ return names;
+ }
+
+ /**
+ * Start an HTTP server by making sure that its background thread inherit proper daemon flag.
+ */
+ private void start(boolean daemon) {
+ if (daemon == Thread.currentThread().isDaemon()) {
+ server.start();
+ } else {
+ FutureTask<Void> startTask = new FutureTask<Void>(new Runnable() {
+ @Override
+ public void run() {
+ server.start();
+ }
+ }, null);
+ NamedDaemonThreadFactory.defaultThreadFactory(daemon).newThread(startTask).start();
+ try {
+ startTask.get();
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Unexpected exception on starting HTTPSever", e);
+ } catch (InterruptedException e) {
+ // This is possible only if the current tread has been interrupted,
+ // but in real use cases this should not happen.
+ // In any case, there is nothing to do, except to propagate interrupted flag.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Stop the HTTPServer.
+ */
+ @Override
+ public void close() {
+ server.stop(0);
+ executorService.shutdown(); // Free any (parked/idle) threads in pool
+ }
+
+ /**
+ * Gets the port number.
+ */
+ public int getPort() {
+ return server.getAddress().getPort();
+ }
+
+ private static class LocalByteArray extends ThreadLocal<BackedFileOutputStream> {
+ @Override
+ protected BackedFileOutputStream initialValue() {
+ return new BackedFileOutputStream((1 << 20) * 16, false, TEMP_DIR);
+ }
+ }
+
+ /**
+ * Handles Metrics collections from the given registry.
+ */
+ public static class HTTPMetricHandler implements HttpHandler {
+ private final static String HEALTHY_RESPONSE = "Exporter is Healthy.";
+ private final CollectorRegistry registry;
+ private final LocalByteArray response = new LocalByteArray();
+ private final Supplier<Predicate<String>> sampleNameFilterSupplier;
+
+ public HTTPMetricHandler(CollectorRegistry registry) {
+ this(registry, null);
+ }
+
+ public HTTPMetricHandler(CollectorRegistry registry, Supplier<Predicate<String>> sampleNameFilterSupplier) {
+ this.registry = registry;
+ this.sampleNameFilterSupplier = sampleNameFilterSupplier;
+ }
+
+ @Override
+ public void handle(HttpExchange t) throws IOException {
+ String query = t.getRequestURI().getRawQuery();
+ String contextPath = t.getHttpContext().getPath();
+ BackedFileOutputStream response = this.response.get();
+ response.reset();
+ OutputStreamWriter osw = new OutputStreamWriter(response, Charset.forName("UTF-8"));
+ if ("/-/healthy".equals(contextPath)) {
+ osw.write(HEALTHY_RESPONSE);
+ } else {
+ String contentType = TextFormat.chooseContentType(t.getRequestHeaders().getFirst("Accept"));
+ t.getResponseHeaders().set("Content-Type", contentType);
+ Predicate<String> filter = sampleNameFilterSupplier == null ? null : sampleNameFilterSupplier.get();
+ filter = SampleNameFilter.restrictToNamesEqualTo(filter, parseQuery(query));
+ if (filter == null) {
+ TextFormat.writeFormat(contentType, osw, registry.metricFamilySamples());
+ } else {
+ TextFormat.writeFormat(contentType, osw, registry.filteredMetricFamilySamples(filter));
+ }
+ }
+
+ osw.close();
+
+ if (shouldUseCompression(t)) {
+ t.getResponseHeaders().set("Content-Encoding", "gzip");
+ t.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0);
+ final GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody());
+ try {
+ response.writeTo(os);
+ } finally {
+ os.close();
+ }
+ } else {
+ long contentLength = response.size();
+ if (contentLength > 0) {
+ t.getResponseHeaders().set("Content-Length", String.valueOf(contentLength));
+ }
+ if (t.getRequestMethod().equals("HEAD")) {
+ contentLength = -1;
+ }
+ t.sendResponseHeaders(HttpURLConnection.HTTP_OK, contentLength);
+ response.writeTo(t.getResponseBody());
+ }
+ t.close();
+ response.reset();
+ }
+ }
+
+ static class NamedDaemonThreadFactory implements ThreadFactory {
+ private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
+
+ private final int poolNumber = POOL_NUMBER.getAndIncrement();
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final ThreadFactory delegate;
+ private final boolean daemon;
+
+ NamedDaemonThreadFactory(ThreadFactory delegate, boolean daemon) {
+ this.delegate = delegate;
+ this.daemon = daemon;
+ }
+
+ static ThreadFactory defaultThreadFactory(boolean daemon) {
+ return new NamedDaemonThreadFactory(Executors.defaultThreadFactory(), daemon);
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = delegate.newThread(r);
+ t.setName(String.format("prometheus-http-%d-%d", poolNumber, threadNumber.getAndIncrement()));
+ t.setDaemon(daemon);
+ return t;
+ }
+ }
+
+ /**
+ * We keep the original constructors of {@link MqttHTTPServer} for compatibility, but new configuration
+ * parameters like {@code sampleNameFilter} must be configured using the Builder.
+ */
+ public static class Builder {
+
+ private int port = 0;
+ private String hostname = null;
+ private InetAddress inetAddress = null;
+ private InetSocketAddress inetSocketAddress = null;
+ private HttpServer httpServer = null;
+ private CollectorRegistry registry = CollectorRegistry.defaultRegistry;
+ private boolean daemon = false;
+ private Predicate<String> sampleNameFilter;
+ private Supplier<Predicate<String>> sampleNameFilterSupplier;
+ private Authenticator authenticator;
+ private HttpsConfigurator httpsConfigurator;
+
+ /**
+ * Port to bind to. Must not be called together with {@link #withInetSocketAddress(InetSocketAddress)}
+ * or {@link #withHttpServer(HttpServer)}. Default is 0, indicating that a random port will be selected.
+ */
+ public Builder withPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * Use this hostname to resolve the IP address to bind to. Must not be called together with
+ * {@link #withInetAddress(InetAddress)} or {@link #withInetSocketAddress(InetSocketAddress)}
+ * or {@link #withHttpServer(HttpServer)}.
+ * Default is empty, indicating that the HTTPServer binds to the wildcard address.
+ */
+ public Builder withHostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ /**
+ * Bind to this IP address. Must not be called together with {@link #withHostname(String)} or
+ * {@link #withInetSocketAddress(InetSocketAddress)} or {@link #withHttpServer(HttpServer)}.
+ * Default is empty, indicating that the HTTPServer binds to the wildcard address.
+ */
+ public Builder withInetAddress(InetAddress address) {
+ this.inetAddress = address;
+ return this;
+ }
+
+ /**
+ * Listen on this address. Must not be called together with {@link #withPort(int)},
+ * {@link #withHostname(String)}, {@link #withInetAddress(InetAddress)}, or {@link #withHttpServer(HttpServer)}.
+ */
+ public Builder withInetSocketAddress(InetSocketAddress address) {
+ this.inetSocketAddress = address;
+ return this;
+ }
+
+ /**
+ * Use this httpServer. The {@code httpServer} is expected to already be bound to an address.
+ * Must not be called together with {@link #withPort(int)}, or {@link #withHostname(String)},
+ * or {@link #withInetAddress(InetAddress)}, or {@link #withInetSocketAddress(InetSocketAddress)}.
+ */
+ public Builder withHttpServer(HttpServer httpServer) {
+ this.httpServer = httpServer;
+ return this;
+ }
+
+ /**
+ * By default, the {@link MqttHTTPServer} uses non-daemon threads. Set this to {@code true} to
+ * run the {@link MqttHTTPServer} with daemon threads.
+ */
+ public Builder withDaemonThreads(boolean daemon) {
+ this.daemon = daemon;
+ return this;
+ }
+
+ /**
+ * Optional: Only export time series where {@code sampleNameFilter.test(name)} returns true.
+ * <p>
+ * Use this if the sampleNameFilter remains the same throughout the lifetime of the HTTPServer.
+ * If the sampleNameFilter changes during runtime, use {@link #withSampleNameFilterSupplier(Supplier)}.
+ */
+ public Builder withSampleNameFilter(Predicate<String> sampleNameFilter) {
+ this.sampleNameFilter = sampleNameFilter;
+ return this;
+ }
+
+ /**
+ * Optional: Only export time series where {@code sampleNameFilter.test(name)} returns true.
+ * <p>
+ * Use this if the sampleNameFilter may change during runtime, like for example if you have a
+ * hot reload mechanism for your filter config.
+ * If the sampleNameFilter remains the same throughout the lifetime of the HTTPServer,
+ * use {@link #withSampleNameFilter(Predicate)} instead.
+ */
+ public Builder withSampleNameFilterSupplier(Supplier<Predicate<String>> sampleNameFilterSupplier) {
+ this.sampleNameFilterSupplier = sampleNameFilterSupplier;
+ return this;
+ }
+
+ /**
+ * Optional: Default is {@link CollectorRegistry#defaultRegistry}.
+ */
+ public Builder withRegistry(CollectorRegistry registry) {
+ this.registry = registry;
+ return this;
+ }
+
+ /**
+ * Optional: {@link Authenticator} to use to support authentication.
+ */
+ public Builder withAuthenticator(Authenticator authenticator) {
+ this.authenticator = authenticator;
+ return this;
+ }
+
+ /**
+ * Optional: {@link HttpsConfigurator} to use to support TLS/SSL
+ */
+ public Builder withHttpsConfigurator(HttpsConfigurator configurator) {
+ this.httpsConfigurator = configurator;
+ return this;
+ }
+
+ /**
+ * Build the HTTPServer
+ *
+ * @throws IOException
+ */
+ public MqttHTTPServer build() throws IOException {
+ if (sampleNameFilter != null) {
+ assertNull(sampleNameFilterSupplier, "cannot configure 'sampleNameFilter' and 'sampleNameFilterSupplier' at the same time");
+ sampleNameFilterSupplier = SampleNameFilterSupplier.of(sampleNameFilter);
+ }
+
+ if (httpServer != null) {
+ assertZero(port, "cannot configure 'httpServer' and 'port' at the same time");
+ assertNull(hostname, "cannot configure 'httpServer' and 'hostname' at the same time");
+ assertNull(inetAddress, "cannot configure 'httpServer' and 'inetAddress' at the same time");
+ assertNull(inetSocketAddress, "cannot configure 'httpServer' and 'inetSocketAddress' at the same time");
+ assertNull(httpsConfigurator, "cannot configure 'httpServer' and 'httpsConfigurator' at the same time");
+ return new MqttHTTPServer(httpServer, registry, daemon, sampleNameFilterSupplier, authenticator);
+ } else if (inetSocketAddress != null) {
+ assertZero(port, "cannot configure 'inetSocketAddress' and 'port' at the same time");
+ assertNull(hostname, "cannot configure 'inetSocketAddress' and 'hostname' at the same time");
+ assertNull(inetAddress, "cannot configure 'inetSocketAddress' and 'inetAddress' at the same time");
+ } else if (inetAddress != null) {
+ assertNull(hostname, "cannot configure 'inetAddress' and 'hostname' at the same time");
+ inetSocketAddress = new InetSocketAddress(inetAddress, port);
+ } else if (hostname != null) {
+ inetSocketAddress = new InetSocketAddress(hostname, port);
+ } else {
+ inetSocketAddress = new InetSocketAddress(port);
+ }
+
+ HttpServer httpServer = null;
+ if (httpsConfigurator != null) {
+ httpServer = HttpsServer.create(inetSocketAddress, 3);
+ ((HttpsServer)httpServer).setHttpsConfigurator(httpsConfigurator);
+ } else {
+ httpServer = HttpServer.create(inetSocketAddress, 3);
+ }
+
+ return new MqttHTTPServer(httpServer, registry, daemon, sampleNameFilterSupplier, authenticator);
+ }
+
+ private void assertNull(Object o, String msg) {
+ if (o != null) {
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ private void assertZero(int i, String msg) {
+ if (i != 0) {
+ throw new IllegalStateException(msg);
+ }
+ }
+ }
+}
+
diff --git a/pom.xml b/pom.xml
index 87c55cb..0e665ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,6 +12,7 @@
<module>mqtt-common</module>
<module>mqtt-cs</module>
<module>mqtt-ds</module>
+ <module>mqtt-exporter</module>
<module>mqtt-example</module>
</modules>
@@ -23,6 +24,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>4.3.16.RELEASE</spring.version>
<rocket.version>4.9.3</rocket.version>
+ <prometheus.version>0.12.0</prometheus.version>
</properties>
<dependencyManagement>
@@ -34,6 +36,11 @@
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
+ <artifactId>mqtt-exporter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>mqtt-ds</artifactId>
<version>${project.version}</version>
</dependency>
@@ -112,6 +119,27 @@
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
+ <!-- for prometheus -->
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_hotspot</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_httpserver</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>19.0</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>