You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/01/23 05:50:58 UTC
[incubator-inlong] branch master updated: [INLONG-2263] SortStandalone get metric value with error JMX ObjectName (#2265)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 18bb2e1 [INLONG-2263] SortStandalone get metric value with error JMX ObjectName (#2265)
18bb2e1 is described below
commit 18bb2e148ae90bdc034a5f43bd6e0662b67cf54f
Author: 卢春亮 <94...@qq.com>
AuthorDate: Sun Jan 23 13:50:50 2022 +0800
[INLONG-2263] SortStandalone get metric value with error JMX ObjectName (#2265)
* [INLONG-2263] SortStandalone get metric value with error JMX ObjectName
* fix PrometheusMetricListener.java
* add UT for this bug
---
.../standalone/metrics/MetricListenerRunnable.java | 15 +-
.../metrics/TestMetricListenerRunnable.java | 152 ++++++++++++++++++
.../standalone/metrics/TestSortMetricItemSet.java | 170 +++++++++++++++++++++
.../prometheus/PrometheusMetricListener.java | 19 ++-
.../inlong/sort/standalone/sink/hive/HiveSink.java | 1 +
.../sink/hive/WriteHdfsFileRunnable.java | 5 +
6 files changed, 350 insertions(+), 12 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
index 0cdf29f..be1b9db 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
@@ -36,9 +36,11 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.inlong.commons.config.metrics.MetricItem;
import org.apache.inlong.commons.config.metrics.MetricItemMBean;
import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
import org.apache.inlong.commons.config.metrics.MetricValue;
-import org.slf4j.Logger;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
/**
*
@@ -46,7 +48,7 @@ import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
*/
public class MetricListenerRunnable implements Runnable {
- public static final Logger LOG = InlongLoggerFactory.getLogger(MetricObserver.class);
+ public static final Logger LOG = InlongLoggerFactory.getLogger(MetricListenerRunnable.class);
private String domain;
private List<MetricListener> listenerList;
@@ -92,9 +94,14 @@ public class MetricListenerRunnable implements Runnable {
* @throws ClassNotFoundException
*/
@SuppressWarnings("unchecked")
- private List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
+ public List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
ReflectionException, MBeanException, MalformedObjectNameException, ClassNotFoundException {
- ObjectName objName = new ObjectName(domain + MetricItemMBean.DOMAIN_SEPARATOR + "*");
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
+ .append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class))
+ .append(MetricItemMBean.PROPERTY_SEPARATOR)
+ .append("*");
+ ObjectName objName = new ObjectName(beanName.toString());
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);
diff --git a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java
new file mode 100644
index 0000000..d8309c4
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * TestMetricItemSetMBean
+ */
+public class TestMetricListenerRunnable {
+
+ public static final String CLUSTER_ID = "inlong5th_sz";
+ public static final String CONTAINER_NAME = "2222.inlong.Sort.sz100001";
+ public static final String CONTAINER_IP = "127.0.0.1";
+ private static final String SOURCE_ID = "agent-source";
+ private static final String SOURCE_DATA_ID = "12069";
+ private static final String INLONG_GROUP_ID1 = "03a00000026";
+ private static final String INLONG_GROUP_ID2 = "03a00000126";
+ private static final String INLONG_STREAM_ID = "";
+ private static final String SINK_ID = "inlong5th-pulsar-sz";
+ private static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+ private static SortMetricItemSet itemSet;
+ private static Map<String, String> dimSource;
+ private static Map<String, String> dimSink;
+ private static String keySource1;
+ private static String keySource2;
+ private static String keySink1;
+ private static String keySink2;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setup() {
+ itemSet = new SortMetricItemSet(CLUSTER_ID);
+ MetricRegister.register(itemSet);
+ // prepare
+ SortMetricItem itemSource = new SortMetricItem();
+ itemSource.clusterId = CLUSTER_ID;
+ itemSource.sourceId = SOURCE_ID;
+ itemSource.sourceDataId = SOURCE_DATA_ID;
+ itemSource.inlongGroupId = INLONG_GROUP_ID1;
+ itemSource.inlongStreamId = INLONG_STREAM_ID;
+ dimSource = itemSource.getDimensions();
+ //
+ SortMetricItem itemSink = new SortMetricItem();
+ itemSink.clusterId = CLUSTER_ID;
+ itemSink.sinkId = SINK_ID;
+ itemSink.sinkDataId = SINK_DATA_ID;
+ itemSink.inlongGroupId = INLONG_GROUP_ID1;
+ itemSink.inlongStreamId = INLONG_STREAM_ID;
+ dimSink = itemSink.getDimensions();
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResult() throws Exception {
+ // increase source
+ SortMetricItem item = null;
+ item = itemSet.findMetricItem(dimSource);
+ item.readSuccessCount.incrementAndGet();
+ item.readSuccessSize.addAndGet(100);
+ keySource1 = MetricUtils.getDimensionsKey(dimSource);
+ //
+ dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
+ item = itemSet.findMetricItem(dimSource);
+ item.readFailCount.addAndGet(20);
+ item.readFailSize.addAndGet(2000);
+ keySource2 = MetricUtils.getDimensionsKey(dimSource);
+ // increase sink
+ item = itemSet.findMetricItem(dimSink);
+ item.sendCount.incrementAndGet();
+ item.sendSize.addAndGet(100);
+ item.sendSuccessCount.incrementAndGet();
+ item.sendSuccessSize.addAndGet(100);
+ keySink1 = MetricUtils.getDimensionsKey(dimSink);
+ //
+ dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
+ item = itemSet.findMetricItem(dimSink);
+ item.sendCount.addAndGet(20);
+ item.sendSize.addAndGet(2000);
+ item.sendFailCount.addAndGet(20);
+ item.sendFailSize.addAndGet(2000);
+ keySink2 = MetricUtils.getDimensionsKey(dimSink);
+ // report
+ MetricListener listener = new MetricListener() {
+
+ @Override
+ public void snapshot(String domain, List<MetricItemValue> itemValues) {
+ assertEquals("Sort", domain);
+ for (MetricItemValue itemValue : itemValues) {
+ String key = itemValue.getKey();
+ Map<String, MetricValue> metricMap = itemValue.getMetrics();
+ if (keySource1.equals(itemValue.getKey())) {
+ assertEquals(1, metricMap.get("readSuccessCount").value);
+ assertEquals(100, metricMap.get("readSuccessSize").value);
+ } else if (keySource2.equals(key)) {
+ assertEquals(20, metricMap.get("readFailCount").value);
+ assertEquals(2000, metricMap.get("readFailSize").value);
+ } else if (keySink1.equals(key)) {
+ assertEquals(1, metricMap.get("sendCount").value);
+ assertEquals(100, metricMap.get("sendSize").value);
+ assertEquals(1, metricMap.get("sendSuccessCount").value);
+ assertEquals(100, metricMap.get("sendSuccessSize").value);
+ } else if (keySink2.equals(key)) {
+ assertEquals(20, metricMap.get("sendCount").value);
+ assertEquals(2000, metricMap.get("sendSize").value);
+ assertEquals(20, metricMap.get("sendFailCount").value);
+ assertEquals(2000, metricMap.get("sendFailSize").value);
+ } else {
+ System.out.println("bad MetricItem:" + key);
+ }
+ }
+ }
+ };
+ List<MetricListener> listeners = new ArrayList<>();
+ listeners.add(listener);
+ MetricListenerRunnable runnable = new MetricListenerRunnable("Sort", listeners);
+ List<MetricItemValue> itemValues = runnable.getItemValues();
+ runnable.run();
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestSortMetricItemSet.java b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestSortMetricItemSet.java
new file mode 100644
index 0000000..371bfc1
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestSortMetricItemSet.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * TestMetricItemSetMBean
+ */
+public class TestSortMetricItemSet {
+
+ public static final String CLUSTER_ID = "inlong5th_sz";
+ public static final String CONTAINER_NAME = "2222.inlong.Sort.sz100001";
+ public static final String CONTAINER_IP = "127.0.0.1";
+ private static final String SOURCE_ID = "agent-source";
+ private static final String SOURCE_DATA_ID = "12069";
+ private static final String INLONG_GROUP_ID1 = "03a00000026";
+ private static final String INLONG_GROUP_ID2 = "03a00000126";
+ private static final String INLONG_STREAM_ID = "";
+ private static final String SINK_ID = "inlong5th-pulsar-sz";
+ private static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+ private static SortMetricItemSet itemSet;
+ private static Map<String, String> dimSource;
+ private static Map<String, String> dimSink;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setup() {
+ itemSet = new SortMetricItemSet(CLUSTER_ID);
+ MetricRegister.register(itemSet);
+ // prepare
+ SortMetricItem itemSource = new SortMetricItem();
+ itemSource.clusterId = CLUSTER_ID;
+ itemSource.sourceId = SOURCE_ID;
+ itemSource.sourceDataId = SOURCE_DATA_ID;
+ itemSource.inlongGroupId = INLONG_GROUP_ID1;
+ itemSource.inlongStreamId = INLONG_STREAM_ID;
+ dimSource = itemSource.getDimensions();
+ //
+ SortMetricItem itemSink = new SortMetricItem();
+ itemSink.clusterId = CLUSTER_ID;
+ itemSink.sinkId = SINK_ID;
+ itemSink.sinkDataId = SINK_DATA_ID;
+ itemSink.inlongGroupId = INLONG_GROUP_ID1;
+ itemSink.inlongStreamId = INLONG_STREAM_ID;
+ dimSink = itemSink.getDimensions();
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResult() throws Exception {
+ // increase source
+ SortMetricItem item = null;
+ item = itemSet.findMetricItem(dimSource);
+ item.readSuccessCount.incrementAndGet();
+ item.readSuccessSize.addAndGet(100);
+ String keySource1 = MetricUtils.getDimensionsKey(dimSource);
+ //
+ dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
+ item = itemSet.findMetricItem(dimSource);
+ item.readFailCount.addAndGet(20);
+ item.readFailSize.addAndGet(2000);
+ String keySource2 = MetricUtils.getDimensionsKey(dimSource);
+ // increase sink
+ item = itemSet.findMetricItem(dimSink);
+ item.sendCount.incrementAndGet();
+ item.sendSize.addAndGet(100);
+ item.sendSuccessCount.incrementAndGet();
+ item.sendSuccessSize.addAndGet(100);
+ String keySink1 = MetricUtils.getDimensionsKey(dimSink);
+ //
+ dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
+ item = itemSet.findMetricItem(dimSink);
+ item.sendCount.addAndGet(20);
+ item.sendSize.addAndGet(2000);
+ item.sendFailCount.addAndGet(20);
+ item.sendFailSize.addAndGet(2000);
+ String keySink2 = MetricUtils.getDimensionsKey(dimSink);
+ // report
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ {
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
+ .append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class))
+ .append(MetricItemMBean.PROPERTY_SEPARATOR)
+ .append("name=").append(itemSet.getName());
+ String strBeanName = beanName.toString();
+ ObjectName objName = new ObjectName(strBeanName);
+ List<MetricItem> items = (List<MetricItem>) mbs.invoke(objName, MetricItemSetMBean.METHOD_SNAPSHOT, null,
+ null);
+ for (MetricItem itemObj : items) {
+ if (keySource1.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(1, metricMap.get("readSuccessCount").value);
+ assertEquals(100, metricMap.get("readSuccessSize").value);
+ } else if (keySource2.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(20, metricMap.get("readFailCount").value);
+ assertEquals(2000, metricMap.get("readFailSize").value);
+ } else if (keySink1.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(1, metricMap.get("sendCount").value);
+ assertEquals(100, metricMap.get("sendSize").value);
+ assertEquals(1, metricMap.get("sendSuccessCount").value);
+ assertEquals(100, metricMap.get("sendSuccessSize").value);
+ } else if (keySink2.equals(itemObj.getDimensionsKey())) {
+ Map<String, MetricValue> metricMap = itemObj.snapshot();
+ assertEquals(20, metricMap.get("sendCount").value);
+ assertEquals(2000, metricMap.get("sendSize").value);
+ assertEquals(20, metricMap.get("sendFailCount").value);
+ assertEquals(2000, metricMap.get("sendFailSize").value);
+ } else {
+ System.out.println("bad MetricItem:" + itemObj.getDimensionsKey());
+ }
+ }
+ }
+ {
+ StringBuilder beanName = new StringBuilder();
+ beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
+ .append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class))
+ .append(MetricItemMBean.PROPERTY_SEPARATOR)
+ .append("name=").append(itemSet.getName());
+
+ String strBeanName = beanName.toString();
+ ObjectName objName = new ObjectName(strBeanName);
+ Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
+ }
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
index e3dd138..1fbc7ff 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
@@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.commons.config.metrics.MetricValue;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.metrics.MetricItemValue;
@@ -66,14 +65,14 @@ import io.prometheus.client.exporter.HTTPServer;
*/
public class PrometheusMetricListener extends Collector implements MetricListener {
- public static final Logger LOG = LoggerFactory.getLogger(MetricRegister.class);
+ public static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricListener.class);
public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
public static final String DEFAULT_DIMENSION_LABEL = "dimension";
+ private String metricName;
private SortMetricItem metricItem;
private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
- private String metricName;
protected HTTPServer httpServer;
private Map<String, MetricItemValue> dimensionMetricValueMap = new ConcurrentHashMap<>();
private List<String> dimensionKeys = new ArrayList<>();
@@ -82,11 +81,12 @@ public class PrometheusMetricListener extends Collector implements MetricListene
* Constructor
*/
public PrometheusMetricListener() {
+ this.metricName = CommonPropertiesHolder.getString(KEY_CLUSTER_ID);
this.metricItem = new SortMetricItem();
- this.metricItem.clusterId = CommonPropertiesHolder.getString(KEY_CLUSTER_ID);
+ this.metricItem.clusterId = metricName;
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
StringBuilder beanName = new StringBuilder();
- beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyCounter");
+ beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=SortStandalonePrometheus");
String strBeanName = beanName.toString();
try {
ObjectName objName = new ObjectName(strBeanName);
@@ -115,6 +115,7 @@ public class PrometheusMetricListener extends Collector implements MetricListene
int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
try {
this.httpServer = new HTTPServer(httpPort);
+ this.register();
} catch (IOException e) {
LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
}
@@ -142,7 +143,7 @@ public class PrometheusMetricListener extends Collector implements MetricListene
// id dimension
String dimensionKey = itemValue.getKey();
MetricItemValue dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
- if (dimensionKey == null) {
+ if (dimensionMetricValue == null) {
dimensionMetricValue = new MetricItemValue(dimensionKey, new ConcurrentHashMap<String, String>(),
new ConcurrentHashMap<String, MetricValue>());
this.dimensionMetricValueMap.putIfAbsent(dimensionKey, dimensionMetricValue);
@@ -177,7 +178,8 @@ public class PrometheusMetricListener extends Collector implements MetricListene
@Override
public List<MetricFamilySamples> collect() {
// total
- CounterMetricFamily totalCounter = new CounterMetricFamily(metricName + ".total", "help",
+ CounterMetricFamily totalCounter = new CounterMetricFamily(metricName + "&group=total",
+ "The metrics of SortStandalone node.",
Arrays.asList("dimension"));
totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_COUNT), metricItem.readSuccessCount.get());
totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_SIZE), metricItem.readSuccessSize.get());
@@ -199,7 +201,8 @@ public class PrometheusMetricListener extends Collector implements MetricListene
mfs.add(totalCounter);
// id dimension
- CounterMetricFamily idCounter = new CounterMetricFamily(metricName + ".id", "help", this.dimensionKeys);
+ CounterMetricFamily idCounter = new CounterMetricFamily(metricName + "&group=id",
+ "The metrics of inlong datastream.", this.dimensionKeys);
for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
MetricItemValue itemValue = entry.getValue();
// read
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
index b6814b3..4084fe7 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
@@ -213,6 +213,7 @@ public class HiveSink extends AbstractSink implements Configurable {
// new runnable
WriteHdfsFileRunnable writeTask = new WriteHdfsFileRunnable(context, idFile, dispatchProfile);
context.getOutputPool().execute(writeTask);
+ context.addSendMetric(dispatchProfile, context.getTaskName());
dispatchProfile = this.dispatchQueue.poll();
}
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java
index 0935275..9848471 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java
@@ -34,6 +34,7 @@ public class WriteHdfsFileRunnable implements Runnable {
private final HiveSinkContext context;
private final HdfsIdFile idFile;
private final DispatchProfile profile;
+ private final long sendTime;
/**
* Constructor
@@ -46,6 +47,7 @@ public class WriteHdfsFileRunnable implements Runnable {
this.context = context;
this.idFile = idFile;
this.profile = profile;
+ this.sendTime = System.currentTimeMillis();
}
/**
@@ -55,6 +57,7 @@ public class WriteHdfsFileRunnable implements Runnable {
public void run() {
synchronized (idFile) {
if (!idFile.isOpen()) {
+ context.addSendResultMetric(profile, context.getTaskName(), false, sendTime);
context.getDispatchQueue().offer(profile);
return;
}
@@ -67,8 +70,10 @@ public class WriteHdfsFileRunnable implements Runnable {
output.writeByte(HdfsIdFile.SEPARATOR_MESSAGE);
}
output.flush();
+ context.addSendResultMetric(profile, context.getTaskName(), true, sendTime);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
+ context.addSendResultMetric(profile, context.getTaskName(), false, sendTime);
context.getDispatchQueue().offer(profile);
}
}