You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/01/23 05:50:58 UTC

[incubator-inlong] branch master updated: [INLONG-2263] SortStandalone get metric value with error JMX ObjectName (#2265)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 18bb2e1  [INLONG-2263] SortStandalone get metric value with error JMX ObjectName (#2265)
18bb2e1 is described below

commit 18bb2e148ae90bdc034a5f43bd6e0662b67cf54f
Author: 卢春亮 <94...@qq.com>
AuthorDate: Sun Jan 23 13:50:50 2022 +0800

    [INLONG-2263] SortStandalone get metric value with error JMX ObjectName (#2265)
    
    * [INLONG-2263] SortStandalone get metric value with error JMX ObjectName
    
    * fix PrometheusMetricListener.java
    
    * add UT for this bug
---
 .../standalone/metrics/MetricListenerRunnable.java |  15 +-
 .../metrics/TestMetricListenerRunnable.java        | 152 ++++++++++++++++++
 .../standalone/metrics/TestSortMetricItemSet.java  | 170 +++++++++++++++++++++
 .../prometheus/PrometheusMetricListener.java       |  19 ++-
 .../inlong/sort/standalone/sink/hive/HiveSink.java |   1 +
 .../sink/hive/WriteHdfsFileRunnable.java           |   5 +
 6 files changed, 350 insertions(+), 12 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
index 0cdf29f..be1b9db 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/metrics/MetricListenerRunnable.java
@@ -36,9 +36,11 @@ import org.apache.commons.lang.ClassUtils;
 import org.apache.inlong.commons.config.metrics.MetricItem;
 import org.apache.inlong.commons.config.metrics.MetricItemMBean;
 import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
 import org.apache.inlong.commons.config.metrics.MetricValue;
-import org.slf4j.Logger;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
 
 /**
  * 
@@ -46,7 +48,7 @@ import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
  */
 public class MetricListenerRunnable implements Runnable {
 
-    public static final Logger LOG = InlongLoggerFactory.getLogger(MetricObserver.class);
+    public static final Logger LOG = InlongLoggerFactory.getLogger(MetricListenerRunnable.class);
 
     private String domain;
     private List<MetricListener> listenerList;
@@ -92,9 +94,14 @@ public class MetricListenerRunnable implements Runnable {
      * @throws ClassNotFoundException
      */
     @SuppressWarnings("unchecked")
-    private List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
+    public List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
             ReflectionException, MBeanException, MalformedObjectNameException, ClassNotFoundException {
-        ObjectName objName = new ObjectName(domain + MetricItemMBean.DOMAIN_SEPARATOR + "*");
+        StringBuilder beanName = new StringBuilder();
+        beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
+                .append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class))
+                .append(MetricItemMBean.PROPERTY_SEPARATOR)
+                .append("*");
+        ObjectName objName = new ObjectName(beanName.toString());
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
         LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);
diff --git a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java
new file mode 100644
index 0000000..d8309c4
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestMetricListenerRunnable.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * TestMetricItemSetMBean
+ */
+public class TestMetricListenerRunnable {
+
+    public static final String CLUSTER_ID = "inlong5th_sz";
+    public static final String CONTAINER_NAME = "2222.inlong.Sort.sz100001";
+    public static final String CONTAINER_IP = "127.0.0.1";
+    private static final String SOURCE_ID = "agent-source";
+    private static final String SOURCE_DATA_ID = "12069";
+    private static final String INLONG_GROUP_ID1 = "03a00000026";
+    private static final String INLONG_GROUP_ID2 = "03a00000126";
+    private static final String INLONG_STREAM_ID = "";
+    private static final String SINK_ID = "inlong5th-pulsar-sz";
+    private static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+    private static SortMetricItemSet itemSet;
+    private static Map<String, String> dimSource;
+    private static Map<String, String> dimSink;
+    private static String keySource1;
+    private static String keySource2;
+    private static String keySink1;
+    private static String keySink2;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setup() {
+        itemSet = new SortMetricItemSet(CLUSTER_ID);
+        MetricRegister.register(itemSet);
+        // prepare
+        SortMetricItem itemSource = new SortMetricItem();
+        itemSource.clusterId = CLUSTER_ID;
+        itemSource.sourceId = SOURCE_ID;
+        itemSource.sourceDataId = SOURCE_DATA_ID;
+        itemSource.inlongGroupId = INLONG_GROUP_ID1;
+        itemSource.inlongStreamId = INLONG_STREAM_ID;
+        dimSource = itemSource.getDimensions();
+        //
+        SortMetricItem itemSink = new SortMetricItem();
+        itemSink.clusterId = CLUSTER_ID;
+        itemSink.sinkId = SINK_ID;
+        itemSink.sinkDataId = SINK_DATA_ID;
+        itemSink.inlongGroupId = INLONG_GROUP_ID1;
+        itemSink.inlongStreamId = INLONG_STREAM_ID;
+        dimSink = itemSink.getDimensions();
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testResult() throws Exception {
+        // increase source
+        SortMetricItem item = null;
+        item = itemSet.findMetricItem(dimSource);
+        item.readSuccessCount.incrementAndGet();
+        item.readSuccessSize.addAndGet(100);
+        keySource1 = MetricUtils.getDimensionsKey(dimSource);
+        //
+        dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
+        item = itemSet.findMetricItem(dimSource);
+        item.readFailCount.addAndGet(20);
+        item.readFailSize.addAndGet(2000);
+        keySource2 = MetricUtils.getDimensionsKey(dimSource);
+        // increase sink
+        item = itemSet.findMetricItem(dimSink);
+        item.sendCount.incrementAndGet();
+        item.sendSize.addAndGet(100);
+        item.sendSuccessCount.incrementAndGet();
+        item.sendSuccessSize.addAndGet(100);
+        keySink1 = MetricUtils.getDimensionsKey(dimSink);
+        //
+        dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
+        item = itemSet.findMetricItem(dimSink);
+        item.sendCount.addAndGet(20);
+        item.sendSize.addAndGet(2000);
+        item.sendFailCount.addAndGet(20);
+        item.sendFailSize.addAndGet(2000);
+        keySink2 = MetricUtils.getDimensionsKey(dimSink);
+        // report
+        MetricListener listener = new MetricListener() {
+
+            @Override
+            public void snapshot(String domain, List<MetricItemValue> itemValues) {
+                assertEquals("Sort", domain);
+                for (MetricItemValue itemValue : itemValues) {
+                    String key = itemValue.getKey();
+                    Map<String, MetricValue> metricMap = itemValue.getMetrics();
+                    if (keySource1.equals(itemValue.getKey())) {
+                        assertEquals(1, metricMap.get("readSuccessCount").value);
+                        assertEquals(100, metricMap.get("readSuccessSize").value);
+                    } else if (keySource2.equals(key)) {
+                        assertEquals(20, metricMap.get("readFailCount").value);
+                        assertEquals(2000, metricMap.get("readFailSize").value);
+                    } else if (keySink1.equals(key)) {
+                        assertEquals(1, metricMap.get("sendCount").value);
+                        assertEquals(100, metricMap.get("sendSize").value);
+                        assertEquals(1, metricMap.get("sendSuccessCount").value);
+                        assertEquals(100, metricMap.get("sendSuccessSize").value);
+                    } else if (keySink2.equals(key)) {
+                        assertEquals(20, metricMap.get("sendCount").value);
+                        assertEquals(2000, metricMap.get("sendSize").value);
+                        assertEquals(20, metricMap.get("sendFailCount").value);
+                        assertEquals(2000, metricMap.get("sendFailSize").value);
+                    } else {
+                        System.out.println("bad MetricItem:" + key);
+                    }
+                }
+            }
+        };
+        List<MetricListener> listeners = new ArrayList<>();
+        listeners.add(listener);
+        MetricListenerRunnable runnable = new MetricListenerRunnable("Sort", listeners);
+        List<MetricItemValue> itemValues = runnable.getItemValues();
+        runnable.run();
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestSortMetricItemSet.java b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestSortMetricItemSet.java
new file mode 100644
index 0000000..371bfc1
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/test/java/org/apache/inlong/sort/standalone/metrics/TestSortMetricItemSet.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.commons.config.metrics.MetricUtils;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * TestMetricItemSetMBean
+ */
+public class TestSortMetricItemSet {
+
+    public static final String CLUSTER_ID = "inlong5th_sz";
+    public static final String CONTAINER_NAME = "2222.inlong.Sort.sz100001";
+    public static final String CONTAINER_IP = "127.0.0.1";
+    private static final String SOURCE_ID = "agent-source";
+    private static final String SOURCE_DATA_ID = "12069";
+    private static final String INLONG_GROUP_ID1 = "03a00000026";
+    private static final String INLONG_GROUP_ID2 = "03a00000126";
+    private static final String INLONG_STREAM_ID = "";
+    private static final String SINK_ID = "inlong5th-pulsar-sz";
+    private static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+    private static SortMetricItemSet itemSet;
+    private static Map<String, String> dimSource;
+    private static Map<String, String> dimSink;
+
+    /**
+     * setup
+     */
+    @BeforeClass
+    public static void setup() {
+        itemSet = new SortMetricItemSet(CLUSTER_ID);
+        MetricRegister.register(itemSet);
+        // prepare
+        SortMetricItem itemSource = new SortMetricItem();
+        itemSource.clusterId = CLUSTER_ID;
+        itemSource.sourceId = SOURCE_ID;
+        itemSource.sourceDataId = SOURCE_DATA_ID;
+        itemSource.inlongGroupId = INLONG_GROUP_ID1;
+        itemSource.inlongStreamId = INLONG_STREAM_ID;
+        dimSource = itemSource.getDimensions();
+        //
+        SortMetricItem itemSink = new SortMetricItem();
+        itemSink.clusterId = CLUSTER_ID;
+        itemSink.sinkId = SINK_ID;
+        itemSink.sinkDataId = SINK_DATA_ID;
+        itemSink.inlongGroupId = INLONG_GROUP_ID1;
+        itemSink.inlongStreamId = INLONG_STREAM_ID;
+        dimSink = itemSink.getDimensions();
+    }
+
+    /**
+     * testResult
+     * 
+     * @throws Exception
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testResult() throws Exception {
+        // increase source
+        SortMetricItem item = null;
+        item = itemSet.findMetricItem(dimSource);
+        item.readSuccessCount.incrementAndGet();
+        item.readSuccessSize.addAndGet(100);
+        String keySource1 = MetricUtils.getDimensionsKey(dimSource);
+        //
+        dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
+        item = itemSet.findMetricItem(dimSource);
+        item.readFailCount.addAndGet(20);
+        item.readFailSize.addAndGet(2000);
+        String keySource2 = MetricUtils.getDimensionsKey(dimSource);
+        // increase sink
+        item = itemSet.findMetricItem(dimSink);
+        item.sendCount.incrementAndGet();
+        item.sendSize.addAndGet(100);
+        item.sendSuccessCount.incrementAndGet();
+        item.sendSuccessSize.addAndGet(100);
+        String keySink1 = MetricUtils.getDimensionsKey(dimSink);
+        //
+        dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
+        item = itemSet.findMetricItem(dimSink);
+        item.sendCount.addAndGet(20);
+        item.sendSize.addAndGet(2000);
+        item.sendFailCount.addAndGet(20);
+        item.sendFailSize.addAndGet(2000);
+        String keySink2 = MetricUtils.getDimensionsKey(dimSink);
+        // report
+        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        {
+            StringBuilder beanName = new StringBuilder();
+            beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
+                    .append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class))
+                    .append(MetricItemMBean.PROPERTY_SEPARATOR)
+                    .append("name=").append(itemSet.getName());
+            String strBeanName = beanName.toString();
+            ObjectName objName = new ObjectName(strBeanName);
+            List<MetricItem> items = (List<MetricItem>) mbs.invoke(objName, MetricItemSetMBean.METHOD_SNAPSHOT, null,
+                    null);
+            for (MetricItem itemObj : items) {
+                if (keySource1.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(1, metricMap.get("readSuccessCount").value);
+                    assertEquals(100, metricMap.get("readSuccessSize").value);
+                } else if (keySource2.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(20, metricMap.get("readFailCount").value);
+                    assertEquals(2000, metricMap.get("readFailSize").value);
+                } else if (keySink1.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(1, metricMap.get("sendCount").value);
+                    assertEquals(100, metricMap.get("sendSize").value);
+                    assertEquals(1, metricMap.get("sendSuccessCount").value);
+                    assertEquals(100, metricMap.get("sendSuccessSize").value);
+                } else if (keySink2.equals(itemObj.getDimensionsKey())) {
+                    Map<String, MetricValue> metricMap = itemObj.snapshot();
+                    assertEquals(20, metricMap.get("sendCount").value);
+                    assertEquals(2000, metricMap.get("sendSize").value);
+                    assertEquals(20, metricMap.get("sendFailCount").value);
+                    assertEquals(2000, metricMap.get("sendFailSize").value);
+                } else {
+                    System.out.println("bad MetricItem:" + itemObj.getDimensionsKey());
+                }
+            }
+        }
+        {
+            StringBuilder beanName = new StringBuilder();
+            beanName.append(MetricRegister.JMX_DOMAIN).append(MetricItemMBean.DOMAIN_SEPARATOR)
+                    .append("type=").append(MetricUtils.getDomain(SortMetricItemSet.class))
+                    .append(MetricItemMBean.PROPERTY_SEPARATOR)
+                    .append("name=").append(itemSet.getName());
+
+            String strBeanName = beanName.toString();
+            ObjectName objName = new ObjectName(strBeanName);
+            Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
+        }
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
index e3dd138..1fbc7ff 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/prometheus/PrometheusMetricListener.java
@@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.inlong.commons.config.metrics.MetricRegister;
 import org.apache.inlong.commons.config.metrics.MetricValue;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.metrics.MetricItemValue;
@@ -66,14 +65,14 @@ import io.prometheus.client.exporter.HTTPServer;
  */
 public class PrometheusMetricListener extends Collector implements MetricListener {
 
-    public static final Logger LOG = LoggerFactory.getLogger(MetricRegister.class);
+    public static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricListener.class);
     public static final String KEY_PROMETHEUS_HTTP_PORT = "prometheusHttpPort";
     public static final int DEFAULT_PROMETHEUS_HTTP_PORT = 8080;
     public static final String DEFAULT_DIMENSION_LABEL = "dimension";
 
+    private String metricName;
     private SortMetricItem metricItem;
     private Map<String, AtomicLong> metricValueMap = new ConcurrentHashMap<>();
-    private String metricName;
     protected HTTPServer httpServer;
     private Map<String, MetricItemValue> dimensionMetricValueMap = new ConcurrentHashMap<>();
     private List<String> dimensionKeys = new ArrayList<>();
@@ -82,11 +81,12 @@ public class PrometheusMetricListener extends Collector implements MetricListene
      * Constructor
      */
     public PrometheusMetricListener() {
+        this.metricName = CommonPropertiesHolder.getString(KEY_CLUSTER_ID);
         this.metricItem = new SortMetricItem();
-        this.metricItem.clusterId = CommonPropertiesHolder.getString(KEY_CLUSTER_ID);
+        this.metricItem.clusterId = metricName;
         final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         StringBuilder beanName = new StringBuilder();
-        beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=DataProxyCounter");
+        beanName.append(JMX_DOMAIN).append(DOMAIN_SEPARATOR).append("type=SortStandalonePrometheus");
         String strBeanName = beanName.toString();
         try {
             ObjectName objName = new ObjectName(strBeanName);
@@ -115,6 +115,7 @@ public class PrometheusMetricListener extends Collector implements MetricListene
         int httpPort = CommonPropertiesHolder.getInteger(KEY_PROMETHEUS_HTTP_PORT, DEFAULT_PROMETHEUS_HTTP_PORT);
         try {
             this.httpServer = new HTTPServer(httpPort);
+            this.register();
         } catch (IOException e) {
             LOG.error("exception while register prometheus http server:{},error:{}", metricName, e.getMessage());
         }
@@ -142,7 +143,7 @@ public class PrometheusMetricListener extends Collector implements MetricListene
             // id dimension
             String dimensionKey = itemValue.getKey();
             MetricItemValue dimensionMetricValue = this.dimensionMetricValueMap.get(dimensionKey);
-            if (dimensionKey == null) {
+            if (dimensionMetricValue == null) {
                 dimensionMetricValue = new MetricItemValue(dimensionKey, new ConcurrentHashMap<String, String>(),
                         new ConcurrentHashMap<String, MetricValue>());
                 this.dimensionMetricValueMap.putIfAbsent(dimensionKey, dimensionMetricValue);
@@ -177,7 +178,8 @@ public class PrometheusMetricListener extends Collector implements MetricListene
     @Override
     public List<MetricFamilySamples> collect() {
         // total
-        CounterMetricFamily totalCounter = new CounterMetricFamily(metricName + ".total", "help",
+        CounterMetricFamily totalCounter = new CounterMetricFamily(metricName + "&group=total",
+                "The metrics of SortStandalone node.",
                 Arrays.asList("dimension"));
         totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_COUNT), metricItem.readSuccessCount.get());
         totalCounter.addMetric(Arrays.asList(M_READ_SUCCESS_SIZE), metricItem.readSuccessSize.get());
@@ -199,7 +201,8 @@ public class PrometheusMetricListener extends Collector implements MetricListene
         mfs.add(totalCounter);
 
         // id dimension
-        CounterMetricFamily idCounter = new CounterMetricFamily(metricName + ".id", "help", this.dimensionKeys);
+        CounterMetricFamily idCounter = new CounterMetricFamily(metricName + "&group=id",
+                "The metrics of inlong datastream.", this.dimensionKeys);
         for (Entry<String, MetricItemValue> entry : this.dimensionMetricValueMap.entrySet()) {
             MetricItemValue itemValue = entry.getValue();
             // read
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
index b6814b3..4084fe7 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/HiveSink.java
@@ -213,6 +213,7 @@ public class HiveSink extends AbstractSink implements Configurable {
             // new runnable
             WriteHdfsFileRunnable writeTask = new WriteHdfsFileRunnable(context, idFile, dispatchProfile);
             context.getOutputPool().execute(writeTask);
+            context.addSendMetric(dispatchProfile, context.getTaskName());
             dispatchProfile = this.dispatchQueue.poll();
         }
     }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java
index 0935275..9848471 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/hive/WriteHdfsFileRunnable.java
@@ -34,6 +34,7 @@ public class WriteHdfsFileRunnable implements Runnable {
     private final HiveSinkContext context;
     private final HdfsIdFile idFile;
     private final DispatchProfile profile;
+    private final long sendTime;
 
     /**
      * Constructor
@@ -46,6 +47,7 @@ public class WriteHdfsFileRunnable implements Runnable {
         this.context = context;
         this.idFile = idFile;
         this.profile = profile;
+        this.sendTime = System.currentTimeMillis();
     }
 
     /**
@@ -55,6 +57,7 @@ public class WriteHdfsFileRunnable implements Runnable {
     public void run() {
         synchronized (idFile) {
             if (!idFile.isOpen()) {
+                context.addSendResultMetric(profile, context.getTaskName(), false, sendTime);
                 context.getDispatchQueue().offer(profile);
                 return;
             }
@@ -67,8 +70,10 @@ public class WriteHdfsFileRunnable implements Runnable {
                     output.writeByte(HdfsIdFile.SEPARATOR_MESSAGE);
                 }
                 output.flush();
+                context.addSendResultMetric(profile, context.getTaskName(), true, sendTime);
             } catch (Exception e) {
                 LOG.error(e.getMessage(), e);
+                context.addSendResultMetric(profile, context.getTaskName(), false, sendTime);
                 context.getDispatchQueue().offer(profile);
             }
         }