You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/19 07:03:23 UTC
[incubator-inlong] branch master updated: [INLONG-2106] DataProxy expose metric data using prometheus HttpServer. (#2165)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e60f461 [INLONG-2106] DataProxy expose metric data using prometheus HttpServer. (#2165)
e60f461 is described below
commit e60f461b9ea1b53401a994139395b756f50ac470
Author: 卢春亮 <94...@qq.com>
AuthorDate: Wed Jan 19 15:03:18 2022 +0800
[INLONG-2106] DataProxy expose metric data using prometheus HttpServer. (#2165)
---
inlong-dataproxy/conf/common.properties | 3 +-
.../config/holder/CommonPropertiesHolder.java | 30 ++++
.../prometheus/PrometheusMetricListener.java | 183 ++++++++++++++++++---
inlong-dataproxy/pom.xml | 6 +
4 files changed, 201 insertions(+), 21 deletions(-)
diff --git a/inlong-dataproxy/conf/common.properties b/inlong-dataproxy/conf/common.properties
index a7fc28c..f3a112b 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -25,4 +25,5 @@ configCheckInterval=10000
metricDomains=DataProxy
metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
-metricDomains.DataProxy.snapshotInterval=60000
\ No newline at end of file
+metricDomains.DataProxy.snapshotInterval=60000
+prometheusHttpPort=8080
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index adbaba0..0d12c43 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -120,6 +120,36 @@ public class CommonPropertiesHolder {
}
/**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public static Integer getInteger(String key, Integer defaultValue) {
+ String value = get().get(key);
+ if (value != null) {
+ return Integer.valueOf(Integer.parseInt(value.trim()));
+ }
+ return defaultValue;
+ }
+
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ * <p>
+ * Note that this method returns an object as opposed to a primitive. The configuration key requested may not be
+ * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
+ * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
+ * </p>
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public static Integer getInteger(String key) {
+ return getInteger(key, null);
+ }
+
+ /**
* getAuditFormatInterval
*
* @return
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
index 97e1133..2dcface 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
@@ -19,8 +19,24 @@ package org.apache.inlong.dataproxy.metrics.prometheus;
import static org.apache.inlong.commons.config.metrics.MetricItemMBean.DOMAIN_SEPARATOR;
import static org.apache.inlong.commons.config.metrics.MetricRegister.JMX_DOMAIN;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_NODE_DURATION;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_FAIL_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_FAIL_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_SUCCESS_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_SUCCESS_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_FAIL_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_FAIL_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_SUCCESS_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_SUCCESS_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SINK_DURATION;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_WHOLE_DURATION;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.commons.config.metrics.MetricValue;
import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
@@ -40,17 +55,28 @@ import org.apache.inlong.dataproxy.metrics.MetricListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+
/**
*
* PrometheusMetricListener
*/
-public class PrometheusMetricListener implements MetricListener {
+public class PrometheusMetricListener extends Collector implements MetricListener {
- public static final Logger LOG = LoggerFactory.getLogger(MetricRegister.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricListener.class);
+ public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
+ public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
+ public static final String DEFAULT_DIMENSION_LABEL = "dimension";
//
private DataProxyMetricItem metricItem;
private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
+ private String metricName;
+ protected HTTPServer httpServer;
+ private Map<String, MetricItemValue> dimensionMetricValueMap = new ConcurrentHashMap<>();
+ private List<String> dimensionKeys = new ArrayList<>();
/**
* Constructor
@@ -61,31 +87,38 @@ public class PrometheusMetricListener implements MetricListener {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
StringBuilder beanName = new StringBuilder();
beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyCounter");
- String strBeanName = beanName.toString();
+ this.metricName = beanName.toString();
try {
- ObjectName objName = new ObjectName(strBeanName);
+ ObjectName objName = new ObjectName(metricName);
mbs.registerMBean(metricItem, objName);
} catch (Exception ex) {
- LOG.error("exception while register mbean:{},error:{}", strBeanName, ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+ LOG.error("exception while register mbean:{},error:{}", metricName, ex.getMessage());
}
//
- metricValueMap.put(DataProxyMetricItem.M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
- metricValueMap.put(DataProxyMetricItem.M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
- metricValueMap.put(DataProxyMetricItem.M_READ_FAIL_COUNT, metricItem.readFailCount);
- metricValueMap.put(DataProxyMetricItem.M_READ_FAIL_SIZE, metricItem.readFailSize);
+ metricValueMap.put(M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
+ metricValueMap.put(M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
+ metricValueMap.put(M_READ_FAIL_COUNT, metricItem.readFailCount);
+ metricValueMap.put(M_READ_FAIL_SIZE, metricItem.readFailSize);
//
- metricValueMap.put(DataProxyMetricItem.M_SEND_COUNT, metricItem.sendCount);
- metricValueMap.put(DataProxyMetricItem.M_SEND_SIZE, metricItem.sendSize);
+ metricValueMap.put(M_SEND_COUNT, metricItem.sendCount);
+ metricValueMap.put(M_SEND_SIZE, metricItem.sendSize);
//
- metricValueMap.put(DataProxyMetricItem.M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
- metricValueMap.put(DataProxyMetricItem.M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
- metricValueMap.put(DataProxyMetricItem.M_SEND_FAIL_COUNT, metricItem.sendFailCount);
- metricValueMap.put(DataProxyMetricItem.M_SEND_FAIL_SIZE, metricItem.sendFailSize);
+ metricValueMap.put(M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
+ metricValueMap.put(M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
+ metricValueMap.put(M_SEND_FAIL_COUNT, metricItem.sendFailCount);
+ metricValueMap.put(M_SEND_FAIL_SIZE, metricItem.sendFailSize);
//
- metricValueMap.put(DataProxyMetricItem.M_SINK_DURATION, metricItem.sinkDuration);
- metricValueMap.put(DataProxyMetricItem.M_NODE_DURATION, metricItem.nodeDuration);
- metricValueMap.put(DataProxyMetricItem.M_WHOLE_DURATION, metricItem.wholeDuration);
+ metricValueMap.put(M_SINK_DURATION, metricItem.sinkDuration);
+ metricValueMap.put(M_NODE_DURATION, metricItem.nodeDuration);
+ metricValueMap.put(M_WHOLE_DURATION, metricItem.wholeDuration);
+
+ int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
+ try {
+ this.httpServer = new HTTPServer(httpPort);
+ } catch (IOException e) {
+ LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
+ }
+ this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
}
/**
@@ -97,6 +130,7 @@ public class PrometheusMetricListener implements MetricListener {
@Override
public void snapshot(String domain, List<MetricItemValue> itemValues) {
for (MetricItemValue itemValue : itemValues) {
+ // total
for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
String fieldName = entry.getValue().name;
AtomicLong metricValue = this.metricValueMap.get(fieldName);
@@ -105,7 +139,116 @@ public class PrometheusMetricListener implements MetricListener {
metricValue.addAndGet(fieldValue);
}
}
+ // id dimension
+ String dimensionKey = itemValue.getKey();
+ MetricItemValue dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
+ if (dimensionKey == null) {
+ dimensionMetricValue = new MetricItemValue(dimensionKey, new ConcurrentHashMap<String, String>(),
+ new ConcurrentHashMap<String, MetricValue>());
+ this.dimensionMetricValueMap.putIfAbsent(dimensionKey, dimensionMetricValue);
+ dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
+ dimensionMetricValue.getDimensions().putAll(itemValue.getDimensions());
+ // add prometheus label name
+ for (Entry<String, String> entry : itemValue.getDimensions().entrySet()) {
+ if (!this.dimensionKeys.contains(entry.getKey())) {
+ this.dimensionKeys.add(entry.getKey());
+ }
+ }
+ }
+ // count
+ for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
+ String fieldName = entry.getValue().name;
+ MetricValue metricValue = dimensionMetricValue.getMetrics().get(fieldName);
+ if (metricValue == null) {
+ metricValue = MetricValue.of(fieldName, entry.getValue().value);
+ dimensionMetricValue.getMetrics().put(metricValue.name, metricValue);
+ continue;
+ }
+ metricValue.value += entry.getValue().value;
+ }
+ }
+ }
+
+ /**
+ * collect
+ *
+ * @return
+ */
+ @Override
+ public List<MetricFamilySamples> collect() {
+
+ // total
+ CounterMetricFamily totalCounter = new CounterMetricFamily(metricName + "&group=total",
+ "The metrics of dataproxy node.",
+ Arrays.asList("dimension"));
+ totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_COUNT), metricItem.readSuccessCount.get());
+ totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_SIZE), metricItem.readSuccessSize.get());
+ totalCounter.addMetric(Arrays.asList(M_READ_FAIL_COUNT), metricItem.readFailCount.get());
+ totalCounter.addMetric(Arrays.asList(M_READ_FAIL_SIZE), metricItem.readFailSize.get());
+ //
+ totalCounter.addMetric(Arrays.asList(M_SEND_COUNT), metricItem.sendCount.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_SIZE), metricItem.sendSize.get());
+ //
+ totalCounter.addMetric(Arrays.asList(M_SEND_SUCCESS_COUNT), metricItem.sendSuccessCount.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_SUCCESS_SIZE), metricItem.sendSuccessSize.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_FAIL_COUNT), metricItem.sendFailCount.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_FAIL_SIZE), metricItem.sendFailSize.get());
+ //
+ totalCounter.addMetric(Arrays.asList(M_SINK_DURATION), metricItem.sinkDuration.get());
+ totalCounter.addMetric(Arrays.asList(M_NODE_DURATION), metricItem.nodeDuration.get());
+ totalCounter.addMetric(Arrays.asList(M_WHOLE_DURATION), metricItem.wholeDuration.get());
+ List<MetricFamilySamples> mfs = new ArrayList<>();
+ mfs.add(totalCounter);
+
+ // id dimension
+ CounterMetricFamily idCounter = new CounterMetricFamily(metricName + "&group=id",
+ "The metrics of inlong dataflow.", this.dimensionKeys);
+ for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
+ MetricItemValue itemValue = entry.getValue();
+ // read
+ addCounterMetricFamily(M_READ_SUCCESS_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_READ_SUCCESS_SIZE, itemValue, idCounter);
+ addCounterMetricFamily(M_READ_FAIL_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_READ_FAIL_SIZE, itemValue, idCounter);
+ // send
+ addCounterMetricFamily(M_SEND_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_SIZE, itemValue, idCounter);
+ // send success
+ addCounterMetricFamily(M_SEND_SUCCESS_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_SUCCESS_SIZE, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_FAIL_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_FAIL_SIZE, itemValue, idCounter);
+ // duration
+ addCounterMetricFamily(M_SINK_DURATION, itemValue, idCounter);
+ addCounterMetricFamily(M_NODE_DURATION, itemValue, idCounter);
+ addCounterMetricFamily(M_WHOLE_DURATION, itemValue, idCounter);
}
+ mfs.add(idCounter);
+ return mfs;
}
+ /**
+ * addCounterMetricFamily
+ *
+ * @param defaultDemension
+ * @param itemValue
+ * @param idCounter
+ */
+ private void addCounterMetricFamily(String defaultDemension, MetricItemValue itemValue,
+ CounterMetricFamily idCounter) {
+ List<String> labelValues = new ArrayList<>(this.dimensionKeys.size());
+ labelValues.add(defaultDemension);
+ Map<String, String> dimensions = itemValue.getDimensions();
+ for (String key : this.dimensionKeys) {
+ String labelValue = dimensions.getOrDefault(key, "-");
+ labelValues.add(labelValue);
+ }
+ long value = 0L;
+ Map<String, MetricValue> metricValueMap = itemValue.getMetrics();
+ MetricValue metricValue = metricValueMap.get(defaultDemension);
+ if (metricValue != null) {
+ value = metricValue.value;
+ }
+ idCounter.addMetric(labelValues, value);
+ }
}
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index 2494f5a..1eb58f5 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -50,6 +50,7 @@
<testng.version>6.14.3</testng.version>
<guava.version>19.0</guava.version>
<powermock.version>2.0.9</powermock.version>
+ <simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
</properties>
<dependencies>
@@ -115,6 +116,11 @@
<version>${guava.version}</version>
</dependency>
<dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_httpserver</artifactId>
+ <version>${simpleclient_httpserver.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>