You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2021/12/13 07:21:38 UTC
[incubator-inlong] branch master updated: [INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling. (#1951)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 164c42e [INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling. (#1951)
164c42e is described below
commit 164c42ef0f4cf814195a937ebb849df32cee9b1a
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Dec 13 15:21:35 2021 +0800
[INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling. (#1951)
* [INLONG-1926] Inlong-Sort-Standalone support JMX metrics listener for pulling.
* fix PR review problem
* fix PR review problem
---
inlong-sort-standalone/conf/common.properties | 4 +
.../prometheus/PrometheusMetricListener.java | 108 +++++++++++++++++++++
2 files changed, 112 insertions(+)
diff --git a/inlong-sort-standalone/conf/common.properties b/inlong-sort-standalone/conf/common.properties
index 797d3a5..c794da7 100644
--- a/inlong-sort-standalone/conf/common.properties
+++ b/inlong-sort-standalone/conf/common.properties
@@ -17,3 +17,7 @@
# under the License.
#
clusterName=sort-standalone-hive-1
+
+metricDomains=Sort
+metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
+metricDomains.Sort.snapshotInterval=60000
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
new file mode 100644
index 0000000..eec6fe4
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics.prometheus;
+
+import static org.apache.inlong.commons.config.metrics.MetricItemMBean.DOMAIN_SEPARATOR;
+import static org.apache.inlong.commons.config.metrics.MetricRegister.JMX_DOMAIN;
+import static org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder.KEY_CLUSTER_ID;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.MetricItemValue;
+import org.apache.inlong.sort.standalone.metrics.MetricListener;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * PrometheusMetricListener
+ */
+public class PrometheusMetricListener implements MetricListener {
+
+ public static final Logger LOG = LoggerFactory.getLogger(MetricRegister.class);
+
+ private SortMetricItem metricItem;
+ private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor
+ */
+ public PrometheusMetricListener() {
+ this.metricItem = new SortMetricItem();
+ this.metricItem.clusterId = CommonPropertiesHolder.getString(KEY_CLUSTER_ID);
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyCounter");
+ String strBeanName = beanName.toString();
+ try {
+ ObjectName objName = new ObjectName(strBeanName);
+ mbs.registerMBean(metricItem, objName);
+ } catch (Exception ex) {
+ LOG.error("exception while register mbean:{},error:{}", strBeanName, ex);
+ }
+ // prepare metric value map
+ metricValueMap.put(SortMetricItem.M_READ_SUCCESS_COUNT, metricItem.readSuccessCount);
+ metricValueMap.put(SortMetricItem.M_READ_SUCCESS_SIZE, metricItem.readSuccessSize);
+ metricValueMap.put(SortMetricItem.M_READ_FAIL_COUNT, metricItem.readFailCount);
+ metricValueMap.put(SortMetricItem.M_READ_FAIL_SIZE, metricItem.readFailSize);
+ //
+ metricValueMap.put(SortMetricItem.M_SEND_COUNT, metricItem.sendCount);
+ metricValueMap.put(SortMetricItem.M_SEND_SIZE, metricItem.sendSize);
+ //
+ metricValueMap.put(SortMetricItem.M_SEND_SUCCESS_COUNT, metricItem.sendSuccessCount);
+ metricValueMap.put(SortMetricItem.M_SEND_SUCCESS_SIZE, metricItem.sendSuccessSize);
+ metricValueMap.put(SortMetricItem.M_SEND_FAIL_COUNT, metricItem.sendFailCount);
+ metricValueMap.put(SortMetricItem.M_SEND_FAIL_SIZE, metricItem.sendFailSize);
+ //
+ metricValueMap.put(SortMetricItem.M_SINK_DURATION, metricItem.sinkDuration);
+ metricValueMap.put(SortMetricItem.M_NODE_DURATION, metricItem.nodeDuration);
+ metricValueMap.put(SortMetricItem.M_WHOLE_DURATION, metricItem.wholeDuration);
+ }
+
+ /**
+ * snapshot
+ *
+ * @param domain
+ * @param itemValues
+ */
+ @Override
+ public void snapshot(String domain, List<MetricItemValue> itemValues) {
+ for (MetricItemValue itemValue : itemValues) {
+ for (Entry<String, MetricValue> entry : itemValue.getMetrics().entrySet()) {
+ String fieldName = entry.getValue().name;
+ AtomicLong metricValue = this.metricValueMap.get(fieldName);
+ if (metricValue != null) {
+ long fieldValue = entry.getValue().value;
+ metricValue.addAndGet(fieldValue);
+ }
+ }
+ }
+ }
+}