You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2021/12/13 07:21:38 UTC

[incubator-inlong] branch master updated: [INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling. (#1951)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 164c42e  [INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling. (#1951)
164c42e is described below

commit 164c42ef0f4cf814195a937ebb849df32cee9b1a
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Dec 13 15:21:35 2021 +0800

    [INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling. (#1951)
    
    * [INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling.
    
    * fix PR review problem
    
    * fix PR review problem
---
 inlong-sort-standalone/conf/common.properties      |   4 +
 .../prometheus/PrometheusMetricListener.java       | 108 +++++++++++++++++++++
 2 files changed, 112 insertions(+)

diff --git a/inlong-sort-standalone/conf/common.properties b/inlong-sort-standalone/conf/common.properties
index 797d3a5..c794da7 100644
--- a/inlong-sort-standalone/conf/common.properties
+++ b/inlong-sort-standalone/conf/common.properties
@@ -17,3 +17,7 @@
 # under the License.
 #
 clusterName=sort-standalone-hive-1
+
+metricDomains=Sort
+metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
+metricDomains.Sort.snapshotInterval=60000
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
new file mode 100644
index 0000000..eec6fe4
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.prometheus;
+
+import static org.apache.inlong.commons.config.metrics.MetricItemMBean.DOMAIN_SEPARATOR;
+import static org.apache.inlong.commons.config.metrics.MetricRegister.JMX_DOMAIN;
+import static org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder.KEY_CLUSTER_ID;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.MetricItemValue;
+import org.apache.inlong.sort.standalone.metrics.MetricListener;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PrometheusMetricListener
+ */
+public class PrometheusMetricListener implements MetricListener {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MetricRegister.class);
+
+    private SortMetricItem metricItem;
+    private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor
+     */
+    public PrometheusMetricListener() {
+        this.metricItem = new SortMetricItem();
+        this.metricItem.clusterId = CommonPropertiesHolder.getString(KEY_CLUSTER_ID);
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        StringBuilder beanName = new StringBuilder();
+        beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyCounter");
+        String strBeanName = beanName.toString();
+        try {
+            ObjectName objName = new ObjectName(strBeanName);
+            mbs.registerMBean(metricItem, objName);
+        } catch (Exception ex) {
+            LOG.error("exception while register mbean:{},error:{}", strBeanName, ex);
+        }
+        // prepare metric value map
+        metricValueMap.put(SortMetricItem.M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
+        metricValueMap.put(SortMetricItem.M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
+        metricValueMap.put(SortMetricItem.M_READ_FAIL_COUNT, metricItem.readFailCount);
+        metricValueMap.put(SortMetricItem.M_READ_FAIL_SIZE, metricItem.readFailSize);
+        //
+        metricValueMap.put(SortMetricItem.M_SEND_COUNT, metricItem.sendCount);
+        metricValueMap.put(SortMetricItem.M_SEND_SIZE, metricItem.sendSize);
+        //
+        metricValueMap.put(SortMetricItem.M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
+        metricValueMap.put(SortMetricItem.M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
+        metricValueMap.put(SortMetricItem.M_SEND_FAIL_COUNT, metricItem.sendFailCount);
+        metricValueMap.put(SortMetricItem.M_SEND_FAIL_SIZE, metricItem.sendFailSize);
+        //
+        metricValueMap.put(SortMetricItem.M_SINK_DURATION, metricItem.sinkDuration);
+        metricValueMap.put(SortMetricItem.M_NODE_DURATION, metricItem.nodeDuration);
+        metricValueMap.put(SortMetricItem.M_WHOLE_DURATION, metricItem.wholeDuration);
+    }
+
+    /**
+     * snapshot
+     * 
+     * @param domain
+     * @param itemValues
+     */
+    @Override
+    public void snapshot(String domain, List<MetricItemValue> itemValues) {
+        for (MetricItemValue itemValue : itemValues) {
+            for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
+                String fieldName = entry.getValue().name;
+                AtomicLong metricValue = this.metricValueMap.get(fieldName);
+                if (metricValue != null) {
+                    long fieldValue = entry.getValue().value;
+                    metricValue.addAndGet(fieldValue);
+                }
+            }
+        }
+    }
+}