You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/17 11:47:20 UTC
[incubator-inlong] branch master updated: [INLONG-2164] Sort-standalone expose metric data using prometheus HttpServer. (#2166)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 13377f0 [INLONG-2164] Sort-standalone expose metric data using prometheus HttpServer. (#2166)
13377f0 is described below
commit 13377f0ffc173b12d46b6479fa487b9ec0ff054d
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Jan 17 19:47:17 2022 +0800
[INLONG-2164] Sort-standalone expose metric data using prometheus HttpServer. (#2166)
---
inlong-sort-standalone/conf/common.properties | 1 +
inlong-sort-standalone/pom.xml | 6 +
.../config/holder/CommonPropertiesHolder.java | 30 ++++
.../prometheus/PrometheusMetricListener.java | 171 +++++++++++++++++++--
4 files changed, 194 insertions(+), 14 deletions(-)
diff --git a/inlong-sort-standalone/conf/common.properties b/inlong-sort-standalone/conf/common.properties
index c794da7..2fae897 100644
--- a/inlong-sort-standalone/conf/common.properties
+++ b/inlong-sort-standalone/conf/common.properties
@@ -21,3 +21,4 @@ clusterName=sort-standalone-hive-1
metricDomains=Sort
metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
metricDomains.Sort.snapshotInterval=60000
+prometheusHttpPort=8080
diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index c159241..517b7d3 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -51,6 +51,7 @@
<compiler.source>1.8</compiler.source>
<compiler.target>1.8</compiler.target>
<pulsar.version>2.7.2</pulsar.version>
+ <simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
</properties>
<dependencies>
@@ -96,6 +97,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_httpserver</artifactId>
+ <version>${simpleclient_httpserver.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 265fd04..a2414e0 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -158,6 +158,36 @@ public class CommonPropertiesHolder {
}
/**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public static Integer getInteger(String key, Integer defaultValue) {
+ String value = get().get(key);
+ if (value != null) {
+ return Integer.valueOf(Integer.parseInt(value.trim()));
+ }
+ return defaultValue;
+ }
+
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ * <p>
+ * Note that this method returns an object as opposed to a primitive. The configuration key requested may not be
+ * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
+ * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
+ * </p>
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public static Integer getInteger(String key) {
+ return getInteger(key, null);
+ }
+
+ /**
* getClusterId
*
* @return
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
index eec6fe4..e3dd138 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
@@ -20,8 +20,24 @@ package org.apache.inlong.sort.standalone.metrics.prometheus;
import static org.apache.inlong.commons.config.metrics.MetricItemMBean.DOMAIN_SEPARATOR;
import static org.apache.inlong.commons.config.metrics.MetricRegister.JMX_DOMAIN;
import static org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder.KEY_CLUSTER_ID;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_NODE_DURATION;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_READ_FAIL_COUNT;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_READ_FAIL_SIZE;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_READ_SUCCESS_COUNT;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_READ_SUCCESS_SIZE;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_SEND_COUNT;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_SEND_FAIL_COUNT;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_SEND_FAIL_SIZE;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_SEND_SIZE;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_SEND_SUCCESS_COUNT;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_SEND_SUCCESS_SIZE;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_SINK_DURATION;
+import static org.apache.inlong.sort.standalone.metrics.SortMetricItem.M_WHOLE_DURATION;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -40,16 +56,27 @@ import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+
/**
*
* PrometheusMetricListener
*/
-public class PrometheusMetricListener implements MetricListener {
+public class PrometheusMetricListener extends Collector implements MetricListener {
public static final Logger LOG = LoggerFactory.getLogger(MetricRegister.class);
+ public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
+ public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
+ public static final String DEFAULT_DIMENSION_LABEL = "dimension";
private SortMetricItem metricItem;
private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
+ private String metricName;
+ protected HTTPServer httpServer;
+ private Map<String, MetricItemValue> dimensionMetricValueMap = new ConcurrentHashMap<>();
+ private List<String> dimensionKeys = new ArrayList<>();
/**
* Constructor
@@ -68,22 +95,30 @@ public class PrometheusMetricListener implements MetricListener {
LOG.error("exception while register mbean:{},error:{}", strBeanName, ex);
}
// prepare metric value map
- metricValueMap.put(SortMetricItem.M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
- metricValueMap.put(SortMetricItem.M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
- metricValueMap.put(SortMetricItem.M_READ_FAIL_COUNT, metricItem.readFailCount);
- metricValueMap.put(SortMetricItem.M_READ_FAIL_SIZE, metricItem.readFailSize);
+ metricValueMap.put(M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
+ metricValueMap.put(M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
+ metricValueMap.put(M_READ_FAIL_COUNT, metricItem.readFailCount);
+ metricValueMap.put(M_READ_FAIL_SIZE, metricItem.readFailSize);
//
- metricValueMap.put(SortMetricItem.M_SEND_COUNT, metricItem.sendCount);
- metricValueMap.put(SortMetricItem.M_SEND_SIZE, metricItem.sendSize);
+ metricValueMap.put(M_SEND_COUNT, metricItem.sendCount);
+ metricValueMap.put(M_SEND_SIZE, metricItem.sendSize);
//
- metricValueMap.put(SortMetricItem.M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
- metricValueMap.put(SortMetricItem.M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
- metricValueMap.put(SortMetricItem.M_SEND_FAIL_COUNT, metricItem.sendFailCount);
- metricValueMap.put(SortMetricItem.M_SEND_FAIL_SIZE, metricItem.sendFailSize);
+ metricValueMap.put(M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
+ metricValueMap.put(M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
+ metricValueMap.put(M_SEND_FAIL_COUNT, metricItem.sendFailCount);
+ metricValueMap.put(M_SEND_FAIL_SIZE, metricItem.sendFailSize);
//
- metricValueMap.put(SortMetricItem.M_SINK_DURATION, metricItem.sinkDuration);
- metricValueMap.put(SortMetricItem.M_NODE_DURATION, metricItem.nodeDuration);
- metricValueMap.put(SortMetricItem.M_WHOLE_DURATION, metricItem.wholeDuration);
+ metricValueMap.put(M_SINK_DURATION, metricItem.sinkDuration);
+ metricValueMap.put(M_NODE_DURATION, metricItem.nodeDuration);
+ metricValueMap.put(M_WHOLE_DURATION, metricItem.wholeDuration);
+
+ int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
+ try {
+ this.httpServer = new HTTPServer(httpPort);
+ } catch (IOException e) {
+ LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
+ }
+ this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
}
/**
@@ -95,6 +130,7 @@ public class PrometheusMetricListener implements MetricListener {
@Override
public void snapshot(String domain, List<MetricItemValue> itemValues) {
for (MetricItemValue itemValue : itemValues) {
+ // total
for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
String fieldName = entry.getValue().name;
AtomicLong metricValue = this.metricValueMap.get(fieldName);
@@ -103,6 +139,113 @@ public class PrometheusMetricListener implements MetricListener {
metricValue.addAndGet(fieldValue);
}
}
+ // id dimension
+ String dimensionKey = itemValue.getKey();
+ MetricItemValue dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
+ if (dimensionKey == null) {
+ dimensionMetricValue = new MetricItemValue(dimensionKey, new ConcurrentHashMap<String, String>(),
+ new ConcurrentHashMap<String, MetricValue>());
+ this.dimensionMetricValueMap.putIfAbsent(dimensionKey, dimensionMetricValue);
+ dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
+ dimensionMetricValue.getDimensions().putAll(itemValue.getDimensions());
+ // add prometheus label name
+ for (Entry<String, String> entry : itemValue.getDimensions().entrySet()) {
+ if (!this.dimensionKeys.contains(entry.getKey())) {
+ this.dimensionKeys.add(entry.getKey());
+ }
+ }
+ }
+ // count
+ for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
+ String fieldName = entry.getValue().name;
+ MetricValue metricValue = dimensionMetricValue.getMetrics().get(fieldName);
+ if (metricValue == null) {
+ metricValue = MetricValue.of(fieldName, entry.getValue().value);
+ dimensionMetricValue.getMetrics().put(metricValue.name, metricValue);
+ continue;
+ }
+ metricValue.value += entry.getValue().value;
+ }
+ }
+ }
+
+ /**
+ * collect
+ *
+ * @return
+ */
+ @Override
+ public List<MetricFamilySamples> collect() {
+ // total
+ CounterMetricFamily totalCounter = new CounterMetricFamily(metricName + ".total", "help",
+ Arrays.asList("dimension"));
+ totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_COUNT), metricItem.readSuccessCount.get());
+ totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_SIZE), metricItem.readSuccessSize.get());
+ totalCounter.addMetric(Arrays.asList(M_READ_FAIL_COUNT), metricItem.readFailCount.get());
+ totalCounter.addMetric(Arrays.asList(M_READ_FAIL_SIZE), metricItem.readFailSize.get());
+ //
+ totalCounter.addMetric(Arrays.asList(M_SEND_COUNT), metricItem.sendCount.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_SIZE), metricItem.sendSize.get());
+ //
+ totalCounter.addMetric(Arrays.asList(M_SEND_SUCCESS_COUNT), metricItem.sendSuccessCount.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_SUCCESS_SIZE), metricItem.sendSuccessSize.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_FAIL_COUNT), metricItem.sendFailCount.get());
+ totalCounter.addMetric(Arrays.asList(M_SEND_FAIL_SIZE), metricItem.sendFailSize.get());
+ //
+ totalCounter.addMetric(Arrays.asList(M_SINK_DURATION), metricItem.sinkDuration.get());
+ totalCounter.addMetric(Arrays.asList(M_NODE_DURATION), metricItem.nodeDuration.get());
+ totalCounter.addMetric(Arrays.asList(M_WHOLE_DURATION), metricItem.wholeDuration.get());
+ List<MetricFamilySamples> mfs = new ArrayList<>();
+ mfs.add(totalCounter);
+
+ // id dimension
+ CounterMetricFamily idCounter = new CounterMetricFamily(metricName + ".id", "help", this.dimensionKeys);
+ for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
+ MetricItemValue itemValue = entry.getValue();
+ // read
+ addCounterMetricFamily(M_READ_SUCCESS_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_READ_SUCCESS_SIZE, itemValue, idCounter);
+ addCounterMetricFamily(M_READ_FAIL_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_READ_FAIL_SIZE, itemValue, idCounter);
+ // send
+ addCounterMetricFamily(M_SEND_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_SIZE, itemValue, idCounter);
+ // send success
+ addCounterMetricFamily(M_SEND_SUCCESS_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_SUCCESS_SIZE, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_FAIL_COUNT, itemValue, idCounter);
+ addCounterMetricFamily(M_SEND_FAIL_SIZE, itemValue, idCounter);
+ // duration
+ addCounterMetricFamily(M_SINK_DURATION, itemValue, idCounter);
+ addCounterMetricFamily(M_NODE_DURATION, itemValue, idCounter);
+ addCounterMetricFamily(M_WHOLE_DURATION, itemValue, idCounter);
+ }
+ mfs.add(idCounter);
+ return mfs;
+ }
+
+ /**
+ * addCounterMetricFamily
+ *
+ * @param defaultDemension
+ * @param itemValue
+ * @param idCounter
+ */
+ private void addCounterMetricFamily(String defaultDemension, MetricItemValue itemValue,
+ CounterMetricFamily idCounter) {
+ List<String> labelValues = new ArrayList<>(this.dimensionKeys.size());
+ labelValues.add(defaultDemension);
+ Map<String, String> dimensions = itemValue.getDimensions();
+ for (String key : this.dimensionKeys) {
+ String labelValue = dimensions.getOrDefault(key, "-");
+ labelValues.add(labelValue);
+ }
+ long value = 0L;
+ Map<String, MetricValue> metricValueMap = itemValue.getMetrics();
+ MetricValue metricValue = metricValueMap.get(defaultDemension);
+ if (metricValue != null) {
+ value = metricValue.value;
}
+ idCounter.addMetric(labelValues, value);
}
}