You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/10 12:29:21 UTC

[inlong] branch master updated: [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b0fd63b93 [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112)
b0fd63b93 is described below

commit b0fd63b93678151941ef40771572cdff92f75793
Author: Keylchen <11...@users.noreply.github.com>
AuthorDate: Mon Oct 10 19:53:56 2022 +0800

    [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112)
---
 .../metrics/AgentPrometheusMetricListener.java     |  17 +-
 .../agent/metrics/TestPrometheusListener.java      | 215 +++++++++++++++++++++
 2 files changed, 224 insertions(+), 8 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
index b1a7911ae..6e809e7eb 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.metrics;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.common.metric.MetricItemValue;
 import org.apache.inlong.common.metric.MetricListener;
@@ -64,8 +65,8 @@ import static org.apache.inlong.common.metric.MetricRegister.JMX_DOMAIN;
  */
 public class AgentPrometheusMetricListener extends Collector implements MetricListener {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
     public static final String DEFAULT_DIMENSION_LABEL = "dimension";
+    private static final Logger LOGGER = LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
     protected HTTPServer httpServer;
     private AgentMetricItem metricItem;
     private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
@@ -114,16 +115,14 @@ public class AgentPrometheusMetricListener extends Collector implements MetricLi
         } catch (IOException e) {
             LOGGER.error("exception while register agent prometheus http server,error:{}", e.getMessage());
         }
-        this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
-
     }
 
     @Override
     public List<MetricFamilySamples> collect() {
+        DefaultExports.initialize();
         // total
-        CounterMetricFamily totalCounter = new CounterMetricFamily("group=total",
-                "The metrics of agent node.",
-                Arrays.asList("dimension"));
+        CounterMetricFamily totalCounter = new CounterMetricFamily("total", "metrics_of_agent_node_total",
+                Arrays.asList(DEFAULT_DIMENSION_LABEL));
         totalCounter.addMetric(Arrays.asList(M_JOB_RUNNING_COUNT), metricItem.jobRunningCount.get());
         totalCounter.addMetric(Arrays.asList(M_JOB_FATAL_COUNT), metricItem.jobFatalCount.get());
         totalCounter.addMetric(Arrays.asList(M_TASK_RUNNING_COUNT), metricItem.taskRunningCount.get());
@@ -143,8 +142,10 @@ public class AgentPrometheusMetricListener extends Collector implements MetricLi
         mfs.add(totalCounter);
 
         // id dimension
-        CounterMetricFamily idCounter = new CounterMetricFamily("group=id",
-                "The metrics of agent dimensions.", this.dimensionKeys);
+        List<String> dimensionIdKeys = new ArrayList<>();
+        dimensionIdKeys.add(DEFAULT_DIMENSION_LABEL);
+        dimensionIdKeys.addAll(this.dimensionKeys);
+        CounterMetricFamily idCounter = new CounterMetricFamily("id", "metrics_of_agent_dimensions", dimensionIdKeys);
         for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
             MetricItemValue itemValue = entry.getValue();
 
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
new file mode 100644
index 000000000..298f87098
--- /dev/null
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.common.metric.MetricListenerRunnable;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.metric.MetricValue;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT;
+
+/**
+ * use to test prometheus listener.
+ */
+public class TestPrometheusListener {
+
+    protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestPrometheusListener.class);
+    private static final Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
+    protected static AgentMetricItemSet metricItemSet;
+    protected static Map<String, String> dimensions;
+    private static AgentMetricItem metricItem = new AgentMetricItem();
+    private final Map<String, MetricItemValue> dimensionMetricValueMap = new ConcurrentHashMap<>();
+    private final List<String> dimensionKeys = new ArrayList<>();
+
+    @BeforeClass
+    public static void setup() {
+        dimensions = new HashMap<>();
+        dimensions.put(KEY_PLUGIN_ID, TestPrometheusListener.class.getSimpleName());
+        String groupId1 = "groupId_test1";
+        dimensions.put(KEY_INLONG_GROUP_ID, groupId1);
+        String streamId = "streamId";
+        dimensions.put(KEY_INLONG_STREAM_ID, streamId);
+        String metricName = String.join("-", TestPrometheusListener.class.getSimpleName(),
+                String.valueOf(METRIC_INDEX.incrementAndGet()));
+        metricItemSet = new AgentMetricItemSet(metricName);
+        MetricRegister.register(metricItemSet);
+        Assert.assertEquals(metricItemSet.getName(), "TestPrometheusListener-1");
+        metricValueMap.put(M_JOB_RUNNING_COUNT, metricItem.jobRunningCount);
+        metricValueMap.put(M_JOB_FATAL_COUNT, metricItem.jobFatalCount);
+
+        metricValueMap.put(M_TASK_RUNNING_COUNT, metricItem.taskRunningCount);
+        metricValueMap.put(M_TASK_RETRYING_COUNT, metricItem.taskRetryingCount);
+        metricValueMap.put(M_TASK_FATAL_COUNT, metricItem.taskFatalCount);
+
+        metricValueMap.put(M_SINK_SUCCESS_COUNT, metricItem.sinkSuccessCount);
+        metricValueMap.put(M_SINK_FAIL_COUNT, metricItem.sinkFailCount);
+
+        metricValueMap.put(M_SOURCE_SUCCESS_COUNT, metricItem.sourceSuccessCount);
+        metricValueMap.put(M_SOURCE_FAIL_COUNT, metricItem.sourceFailCount);
+
+        metricValueMap.put(M_PLUGIN_READ_COUNT, metricItem.pluginReadCount);
+        metricValueMap.put(M_PLUGIN_SEND_COUNT, metricItem.pluginSendCount);
+        metricValueMap.put(M_PLUGIN_READ_FAIL_COUNT, metricItem.pluginReadFailCount);
+        metricValueMap.put(M_PLUGIN_SEND_FAIL_COUNT, metricItem.pluginSendFailCount);
+        metricValueMap.put(M_PLUGIN_READ_SUCCESS_COUNT, metricItem.pluginReadSuccessCount);
+        metricValueMap.put(M_PLUGIN_SEND_SUCCESS_COUNT, metricItem.pluginSendSuccessCount);
+    }
+
+    @Test
+    public void testSnapshot() {
+        metricItem = metricItemSet.findMetricItem(dimensions);
+        metricItem.pluginReadFailCount.incrementAndGet();
+        metricItem.pluginReadSuccessCount.incrementAndGet();
+        // report
+        MetricListener listener = new MetricListener() {
+            @Override
+            public void snapshot(String domain, List<MetricItemValue> itemValues) {
+                for (MetricItemValue itemValue : itemValues) {
+                    String key = itemValue.getKey();
+                    LOGGER.info("KEY : " + key);
+                    Map<String, MetricValue> metricMap = itemValue.getMetrics();
+                    // total
+                    for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
+                        String fieldName = entry.getValue().name;
+                        AtomicLong metricValue = metricValueMap.get(fieldName);
+                        if (metricValue != null) {
+                            long fieldValue = entry.getValue().value;
+                            metricValue.addAndGet(fieldValue);
+                            metricValue.addAndGet(100);
+                        }
+                    }
+                    // dimension
+                    String dimensionKey = itemValue.getKey();
+                    MetricItemValue dimensionMetricValue = dimensionMetricValueMap.get(dimensionKey);
+                    if (dimensionMetricValue == null) {
+                        dimensionMetricValue = new MetricItemValue(dimensionKey,
+                                new ConcurrentHashMap<String, String>(),
+                                new ConcurrentHashMap<String, MetricValue>());
+                        dimensionMetricValueMap.putIfAbsent(dimensionKey, dimensionMetricValue);
+                        dimensionMetricValue = dimensionMetricValueMap.get(dimensionKey);
+                        dimensionMetricValue.getDimensions().putAll(itemValue.getDimensions());
+                        // add prometheus label name
+                        for (Entry<String, String> entry : itemValue.getDimensions().entrySet()) {
+                            if (!dimensionKeys.contains(entry.getKey())) {
+                                dimensionKeys.add(entry.getKey());
+                            }
+                        }
+                    }
+                    // count
+                    for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
+                        String fieldName = entry.getValue().name;
+                        MetricValue metricValue = dimensionMetricValue.getMetrics().get(fieldName);
+                        if (metricValue == null) {
+                            metricValue = MetricValue.of(fieldName, entry.getValue().value);
+                            dimensionMetricValue.getMetrics().put(metricValue.name, metricValue);
+                            continue;
+                        }
+                        metricValue.value += entry.getValue().value;
+                    }
+                }
+            }
+        };
+        Assert.assertEquals(metricItem.pluginReadSuccessCount.get(), 1);
+        Assert.assertEquals(metricItem.pluginReadFailCount.get(), 1);
+        List<MetricListener> listeners = new ArrayList<>();
+        listeners.add(listener);
+        MetricListenerRunnable runnable = new MetricListenerRunnable("Agent", listeners);
+        runnable.run();
+        Assert.assertEquals(metricValueMap.get("pluginReadFailCount").intValue(), 101);
+        Assert.assertTrue(
+                dimensionMetricValueMap.toString().contains("{\"name\":\"pluginReadSuccessCount\",\"value\":1}"));
+        Assert.assertTrue(dimensionKeys.contains("inlongGroupId"));
+        for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
+            MetricItemValue itemValue = entry.getValue();
+            // JOB
+            addMetric(M_JOB_RUNNING_COUNT, itemValue);
+            addMetric(M_JOB_FATAL_COUNT, itemValue);
+            // TASK
+            addMetric(M_TASK_RUNNING_COUNT, itemValue);
+            addMetric(M_TASK_RETRYING_COUNT, itemValue);
+            addMetric(M_TASK_FATAL_COUNT, itemValue);
+            // SINK
+            addMetric(M_SINK_SUCCESS_COUNT, itemValue);
+            addMetric(M_SINK_FAIL_COUNT, itemValue);
+            // SOURCE
+            addMetric(M_SOURCE_SUCCESS_COUNT, itemValue);
+            addMetric(M_SOURCE_FAIL_COUNT, itemValue);
+            // PLUGIN
+            addMetric(M_PLUGIN_READ_COUNT, itemValue);
+            addMetric(M_PLUGIN_SEND_COUNT, itemValue);
+            addMetric(M_PLUGIN_READ_FAIL_COUNT, itemValue);
+            addMetric(M_PLUGIN_SEND_FAIL_COUNT, itemValue);
+            addMetric(M_PLUGIN_READ_SUCCESS_COUNT, itemValue);
+            addMetric(M_PLUGIN_SEND_SUCCESS_COUNT, itemValue);
+        }
+        List<MetricItemValue> metricItemValueList = new ArrayList<>(dimensionMetricValueMap.values());
+        AgentPrometheusMetricListener agentPrometheusMetricListener = new AgentPrometheusMetricListener();
+        agentPrometheusMetricListener.snapshot("Agent", metricItemValueList);
+        LOGGER.debug(agentPrometheusMetricListener.collect().toString());
+    }
+
+    private void addMetric(String defaultDimension, MetricItemValue itemValue) {
+        List<String> labelValues = new ArrayList<>(this.dimensionKeys.size());
+        labelValues.add(defaultDimension);
+        Map<String, String> dimensions = itemValue.getDimensions();
+        for (String key : this.dimensionKeys) {
+            String labelValue = dimensions.getOrDefault(key, "-");
+            labelValues.add(labelValue);
+        }
+        long value = 0L;
+        Map<String, MetricValue> metricValueMap = itemValue.getMetrics();
+        MetricValue metricValue = metricValueMap.get(defaultDimension);
+        if (metricValue != null) {
+            value = metricValue.value;
+        }
+        LOGGER.debug("labelValues is " + labelValues + " and value is " + value);
+    }
+}