You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/19 07:03:23 UTC

[incubator-inlong] branch master updated: [INLONG-2106] DataProxy expose metric data using prometheus HttpServer. (#2165)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e60f461  [INLONG-2106] DataProxy expose metric data using prometheus HttpServer. (#2165)
e60f461 is described below

commit e60f461b9ea1b53401a994139395b756f50ac470
Author: 卢春亮 <94...@qq.com>
AuthorDate: Wed Jan 19 15:03:18 2022 +0800

    [INLONG-2106] DataProxy expose metric data using prometheus HttpServer. (#2165)
---
 inlong-dataproxy/conf/common.properties            |   3 +-
 .../config/holder/CommonPropertiesHolder.java      |  30 ++++
 .../prometheus/PrometheusMetricListener.java       | 183 ++++++++++++++++++---
 inlong-dataproxy/pom.xml                           |   6 +
 4 files changed, 201 insertions(+), 21 deletions(-)

diff --git a/inlong-dataproxy/conf/common.properties b/inlong-dataproxy/conf/common.properties
index a7fc28c..f3a112b 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -25,4 +25,5 @@ configCheckInterval=10000
 
 metricDomains=DataProxy
 metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
-metricDomains.DataProxy.snapshotInterval=60000
\ No newline at end of file
+metricDomains.DataProxy.snapshotInterval=60000
+prometheusHttpPort=8080
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index adbaba0..0d12c43 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -120,6 +120,36 @@ public class CommonPropertiesHolder {
     }
 
     /**
+     * Gets value mapped to key, returning defaultValue if unmapped.
+     * 
+     * @param  key          to be found
+     * @param  defaultValue returned if key is unmapped
+     * @return              value associated with key
+     */
+    public static Integer getInteger(String key, Integer defaultValue) {
+        String value = get().get(key);
+        if (value != null) {
+            return Integer.valueOf(Integer.parseInt(value.trim()));
+        }
+        return defaultValue;
+    }
+
+    /**
+     * Gets value mapped to key, returning null if unmapped.
+     * <p>
+     * Note that this method returns an object as opposed to a primitive. The configuration key requested may not be
+     * mapped to a value and by returning the primitive object wrapper we can return null. If the key does not exist the
+     * return value of this method is assigned directly to a primitive, a {@link NullPointerException} will be thrown.
+     * </p>
+     * 
+     * @param  key to be found
+     * @return     value associated with key or null if unmapped
+     */
+    public static Integer getInteger(String key) {
+        return getInteger(key, null);
+    }
+
+    /**
      * getAuditFormatInterval
      * 
      * @return
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
index 97e1133..2dcface 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
@@ -19,8 +19,24 @@ package org.apache.inlong.dataproxy.metrics.prometheus;
 
 import static org.apache.inlong.commons.config.metrics.MetricItemMBean.DOMAIN_SEPARATOR;
 import static org.apache.inlong.commons.config.metrics.MetricRegister.JMX_DOMAIN;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_NODE_DURATION;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_FAIL_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_FAIL_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_SUCCESS_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_READ_SUCCESS_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_FAIL_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_FAIL_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_SUCCESS_COUNT;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SEND_SUCCESS_SIZE;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_SINK_DURATION;
+import static org.apache.inlong.dataproxy.metrics.DataProxyMetricItem.M_WHOLE_DURATION;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -30,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.commons.config.metrics.MetricValue;
 import org.apache.inlong.dataproxy.config.RemoteConfigManager;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
@@ -40,17 +55,28 @@ import org.apache.inlong.dataproxy.metrics.MetricListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.prometheus.client.Collector;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+
 /**
  * 
  * PrometheusMetricListener
  */
-public class PrometheusMetricListener implements MetricListener {
+public class PrometheusMetricListener extends Collector implements MetricListener {
 
-    public static final Logger LOG = LoggerFactory.getLogger(MetricRegister.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricListener.class);
+    public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
+    public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
+    public static final String DEFAULT_DIMENSION_LABEL = "dimension";
 
     //
     private DataProxyMetricItem metricItem;
     private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
+    private String metricName;
+    protected HTTPServer httpServer;
+    private Map<String, MetricItemValue> dimensionMetricValueMap = new ConcurrentHashMap<>();
+    private List<String> dimensionKeys = new ArrayList<>();
 
     /**
      * Constructor
@@ -61,31 +87,38 @@ public class PrometheusMetricListener implements MetricListener {
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         StringBuilder beanName = new StringBuilder();
         beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyCounter");
-        String strBeanName = beanName.toString();
+        this.metricName = beanName.toString();
         try {
-            ObjectName objName = new ObjectName(strBeanName);
+            ObjectName objName = new ObjectName(metricName);
             mbs.registerMBean(metricItem, objName);
         } catch (Exception ex) {
-            LOG.error("exception while register mbean:{},error:{}", strBeanName, ex.getMessage());
-            LOG.error(ex.getMessage(), ex);
+            LOG.error("exception while register mbean:{},error:{}", metricName, ex.getMessage());
         }
         //
-        metricValueMap.put(DataProxyMetricItem.M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
-        metricValueMap.put(DataProxyMetricItem.M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
-        metricValueMap.put(DataProxyMetricItem.M_READ_FAIL_COUNT, metricItem.readFailCount);
-        metricValueMap.put(DataProxyMetricItem.M_READ_FAIL_SIZE, metricItem.readFailSize);
+        metricValueMap.put(M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
+        metricValueMap.put(M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
+        metricValueMap.put(M_READ_FAIL_COUNT, metricItem.readFailCount);
+        metricValueMap.put(M_READ_FAIL_SIZE, metricItem.readFailSize);
         //
-        metricValueMap.put(DataProxyMetricItem.M_SEND_COUNT, metricItem.sendCount);
-        metricValueMap.put(DataProxyMetricItem.M_SEND_SIZE, metricItem.sendSize);
+        metricValueMap.put(M_SEND_COUNT, metricItem.sendCount);
+        metricValueMap.put(M_SEND_SIZE, metricItem.sendSize);
         //
-        metricValueMap.put(DataProxyMetricItem.M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
-        metricValueMap.put(DataProxyMetricItem.M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
-        metricValueMap.put(DataProxyMetricItem.M_SEND_FAIL_COUNT, metricItem.sendFailCount);
-        metricValueMap.put(DataProxyMetricItem.M_SEND_FAIL_SIZE, metricItem.sendFailSize);
+        metricValueMap.put(M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
+        metricValueMap.put(M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
+        metricValueMap.put(M_SEND_FAIL_COUNT, metricItem.sendFailCount);
+        metricValueMap.put(M_SEND_FAIL_SIZE, metricItem.sendFailSize);
         //
-        metricValueMap.put(DataProxyMetricItem.M_SINK_DURATION, metricItem.sinkDuration);
-        metricValueMap.put(DataProxyMetricItem.M_NODE_DURATION, metricItem.nodeDuration);
-        metricValueMap.put(DataProxyMetricItem.M_WHOLE_DURATION, metricItem.wholeDuration);
+        metricValueMap.put(M_SINK_DURATION, metricItem.sinkDuration);
+        metricValueMap.put(M_NODE_DURATION, metricItem.nodeDuration);
+        metricValueMap.put(M_WHOLE_DURATION, metricItem.wholeDuration);
+
+        int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
+        try {
+            this.httpServer = new HTTPServer(httpPort);
+        } catch (IOException e) {
+            LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
+        }
+        this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
     }
 
     /**
@@ -97,6 +130,7 @@ public class PrometheusMetricListener implements MetricListener {
     @Override
     public void snapshot(String domain, List<MetricItemValue> itemValues) {
         for (MetricItemValue itemValue : itemValues) {
+            // total
             for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
                 String fieldName = entry.getValue().name;
                 AtomicLong metricValue = this.metricValueMap.get(fieldName);
@@ -105,7 +139,116 @@ public class PrometheusMetricListener implements MetricListener {
                     metricValue.addAndGet(fieldValue);
                 }
             }
+            // id dimension
+            String dimensionKey = itemValue.getKey();
+            MetricItemValue dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
+            if (dimensionKey == null) {
+                dimensionMetricValue = new MetricItemValue(dimensionKey, new ConcurrentHashMap<String, String>(),
+                        new ConcurrentHashMap<String, MetricValue>());
+                this.dimensionMetricValueMap.putIfAbsent(dimensionKey, dimensionMetricValue);
+                dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
+                dimensionMetricValue.getDimensions().putAll(itemValue.getDimensions());
+                // add prometheus label name
+                for (Entry<String, String> entry : itemValue.getDimensions().entrySet()) {
+                    if (!this.dimensionKeys.contains(entry.getKey())) {
+                        this.dimensionKeys.add(entry.getKey());
+                    }
+                }
+            }
+            // count
+            for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
+                String fieldName = entry.getValue().name;
+                MetricValue metricValue = dimensionMetricValue.getMetrics().get(fieldName);
+                if (metricValue == null) {
+                    metricValue = MetricValue.of(fieldName, entry.getValue().value);
+                    dimensionMetricValue.getMetrics().put(metricValue.name, metricValue);
+                    continue;
+                }
+                metricValue.value += entry.getValue().value;
+            }
+        }
+    }
+
+    /**
+     * collect
+     * 
+     * @return
+     */
+    @Override
+    public List<MetricFamilySamples> collect() {
+
+        // total
+        CounterMetricFamily totalCounter = new CounterMetricFamily(metricName + "&group=total",
+                "The metrics of dataproxy node.",
+                Arrays.asList("dimension"));
+        totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_COUNT), metricItem.readSuccessCount.get());
+        totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_SIZE), metricItem.readSuccessSize.get());
+        totalCounter.addMetric(Arrays.asList(M_READ_FAIL_COUNT), metricItem.readFailCount.get());
+        totalCounter.addMetric(Arrays.asList(M_READ_FAIL_SIZE), metricItem.readFailSize.get());
+        //
+        totalCounter.addMetric(Arrays.asList(M_SEND_COUNT), metricItem.sendCount.get());
+        totalCounter.addMetric(Arrays.asList(M_SEND_SIZE), metricItem.sendSize.get());
+        //
+        totalCounter.addMetric(Arrays.asList(M_SEND_SUCCESS_COUNT), metricItem.sendSuccessCount.get());
+        totalCounter.addMetric(Arrays.asList(M_SEND_SUCCESS_SIZE), metricItem.sendSuccessSize.get());
+        totalCounter.addMetric(Arrays.asList(M_SEND_FAIL_COUNT), metricItem.sendFailCount.get());
+        totalCounter.addMetric(Arrays.asList(M_SEND_FAIL_SIZE), metricItem.sendFailSize.get());
+        //
+        totalCounter.addMetric(Arrays.asList(M_SINK_DURATION), metricItem.sinkDuration.get());
+        totalCounter.addMetric(Arrays.asList(M_NODE_DURATION), metricItem.nodeDuration.get());
+        totalCounter.addMetric(Arrays.asList(M_WHOLE_DURATION), metricItem.wholeDuration.get());
+        List<MetricFamilySamples> mfs = new ArrayList<>();
+        mfs.add(totalCounter);
+
+        // id dimension
+        CounterMetricFamily idCounter = new CounterMetricFamily(metricName + "&group=id",
+                "The metrics of inlong dataflow.", this.dimensionKeys);
+        for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
+            MetricItemValue itemValue = entry.getValue();
+            // read
+            addCounterMetricFamily(M_READ_SUCCESS_COUNT, itemValue, idCounter);
+            addCounterMetricFamily(M_READ_SUCCESS_SIZE, itemValue, idCounter);
+            addCounterMetricFamily(M_READ_FAIL_COUNT, itemValue, idCounter);
+            addCounterMetricFamily(M_READ_FAIL_SIZE, itemValue, idCounter);
+            // send
+            addCounterMetricFamily(M_SEND_COUNT, itemValue, idCounter);
+            addCounterMetricFamily(M_SEND_SIZE, itemValue, idCounter);
+            // send success
+            addCounterMetricFamily(M_SEND_SUCCESS_COUNT, itemValue, idCounter);
+            addCounterMetricFamily(M_SEND_SUCCESS_SIZE, itemValue, idCounter);
+            addCounterMetricFamily(M_SEND_FAIL_COUNT, itemValue, idCounter);
+            addCounterMetricFamily(M_SEND_FAIL_SIZE, itemValue, idCounter);
+            // duration
+            addCounterMetricFamily(M_SINK_DURATION, itemValue, idCounter);
+            addCounterMetricFamily(M_NODE_DURATION, itemValue, idCounter);
+            addCounterMetricFamily(M_WHOLE_DURATION, itemValue, idCounter);
         }
+        mfs.add(idCounter);
+        return mfs;
     }
 
+    /**
+     * addCounterMetricFamily
+     * 
+     * @param defaultDemension
+     * @param itemValue
+     * @param idCounter
+     */
+    private void addCounterMetricFamily(String defaultDemension, MetricItemValue itemValue,
+            CounterMetricFamily idCounter) {
+        List<String> labelValues = new ArrayList<>(this.dimensionKeys.size());
+        labelValues.add(defaultDemension);
+        Map<String, String> dimensions = itemValue.getDimensions();
+        for (String key : this.dimensionKeys) {
+            String labelValue = dimensions.getOrDefault(key, "-");
+            labelValues.add(labelValue);
+        }
+        long value = 0L;
+        Map<String, MetricValue> metricValueMap = itemValue.getMetrics();
+        MetricValue metricValue = metricValueMap.get(defaultDemension);
+        if (metricValue != null) {
+            value = metricValue.value;
+        }
+        idCounter.addMetric(labelValues, value);
+    }
 }
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index 2494f5a..1eb58f5 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -50,6 +50,7 @@
         <testng.version>6.14.3</testng.version>
         <guava.version>19.0</guava.version>
         <powermock.version>2.0.9</powermock.version>
+        <simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
     </properties>
 
     <dependencies>
@@ -115,6 +116,11 @@
             <version>${guava.version}</version>
         </dependency>
         <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_httpserver</artifactId>
+            <version>${simpleclient_httpserver.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-module-junit4</artifactId>
             <version>${powermock.version}</version>