You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/07 08:31:58 UTC

[inlong] branch master updated: [INLONG-4854][Agent] Report metrics at inongGroupId and inlongStreamId (#4855)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dc9a06f39 [INLONG-4854][Agent] Report metrics at inongGroupId and inlongStreamId (#4855)
dc9a06f39 is described below

commit dc9a06f3940d39332b0fdde842af9b6eb4f31413
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Jul 7 16:31:51 2022 +0800

    [INLONG-4854][Agent] Report metrics at inongGroupId and inlongStreamId (#4855)
---
 .../inlong/agent/plugin/channel/MemoryChannel.java |  54 +++----
 .../inlong/agent/plugin/metrics/GlobalMetrics.java | 175 +++++++++++++++++++++
 .../agent/plugin/metrics/PluginJmxMetric.java      |   4 +-
 .../plugin/metrics/PluginPrometheusMetric.java     |  50 +++---
 .../inlong/agent/plugin/metrics/SinkJmxMetric.java |   2 +-
 .../metrics/{SinkMetrics.java => SinkMetric.java}  |   2 +-
 ...theusMetrics.java => SinkPrometheusMetric.java} |  16 +-
 .../agent/plugin/metrics/SourceJmxMetric.java      |   2 +-
 .../{SourceMetrics.java => SourceMetric.java}      |   8 +-
 ...eusMetrics.java => SourcePrometheusMetric.java} |  24 +--
 .../inlong/agent/plugin/sinks/AbstractSink.java    |  29 +---
 .../inlong/agent/plugin/sinks/ConsoleSink.java     |  12 +-
 .../inlong/agent/plugin/sinks/ProxySink.java       |   9 +-
 .../inlong/agent/plugin/sinks/SenderManager.java   | 109 ++++++-------
 .../inlong/agent/plugin/sources/BinlogSource.java  |  26 ++-
 .../agent/plugin/sources/DatabaseSqlSource.java    |  36 ++---
 .../inlong/agent/plugin/sources/KafkaSource.java   |  25 ++-
 .../agent/plugin/sources/TextFileSource.java       |  30 ++--
 .../plugin/sources/reader/AbstractReader.java      |  41 +----
 .../agent/plugin/sources/reader/BinlogReader.java  |   7 +-
 .../agent/plugin/sources/reader/KafkaReader.java   |  34 +---
 .../agent/plugin/sources/reader/SqlReader.java     |  40 +++--
 .../plugin/sources/reader/TextFileReader.java      |  10 +-
 .../agent/plugin/metrics/GlobalMetricsTest.java    |  66 ++++++++
 .../apache/inlong/agent/plugin/sinks/MockSink.java |  38 ++---
 25 files changed, 480 insertions(+), 369 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
index 56769a30f..583a3c5ae 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
@@ -19,13 +19,10 @@ package org.apache.inlong.agent.plugin.channel;
 
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.plugin.Channel;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+
 /**
  * memory channel
  */
@@ -42,48 +41,48 @@ public class MemoryChannel implements Channel {
 
     private static final String MEMORY_CHANNEL_TAG_NAME = "AgentMemoryPlugin";
     private static AtomicLong metricsIndex = new AtomicLong(0);
-    private final PluginMetric pluginMetricNew;
     private LinkedBlockingQueue<Message> queue;
 
     public MemoryChannel() {
-        if (ConfigUtil.isPrometheusEnabled()) {
-            this.pluginMetricNew = new PluginPrometheusMetric(AgentUtils.getUniqId(
-                    MEMORY_CHANNEL_TAG_NAME, metricsIndex.incrementAndGet()));
-        } else {
-            this.pluginMetricNew = new PluginJmxMetric(AgentUtils.getUniqId(
-                    MEMORY_CHANNEL_TAG_NAME, metricsIndex.incrementAndGet()));
-        }
     }
 
     @Override
     public void push(Message message) {
+        String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
         try {
             if (message != null) {
-                pluginMetricNew.incReadNum();
+                if (message instanceof ProxyMessage) {
+                    groupId = ((ProxyMessage) message).getInlongGroupId();
+                }
+                GlobalMetrics.incReadNum(groupId);
                 queue.put(message);
-                pluginMetricNew.incReadSuccessNum();
+                GlobalMetrics.incReadSuccessNum(groupId);
             }
         } catch (InterruptedException ex) {
-            pluginMetricNew.incReadFailedNum();
+            GlobalMetrics.incReadFailedNum(groupId);
             Thread.currentThread().interrupt();
         }
     }
 
     @Override
     public boolean push(Message message, long timeout, TimeUnit unit) {
+        String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
         try {
             if (message != null) {
-                pluginMetricNew.incReadNum();
+                if (message instanceof ProxyMessage) {
+                    groupId = ((ProxyMessage) message).getInlongGroupId();
+                }
+                GlobalMetrics.incReadNum(groupId);
                 boolean result = queue.offer(message, timeout, unit);
                 if (result) {
-                    pluginMetricNew.incReadSuccessNum();
+                    GlobalMetrics.incReadSuccessNum(groupId);
                 } else {
-                    pluginMetricNew.incReadFailedNum();
+                    GlobalMetrics.incReadFailedNum(groupId);
                 }
                 return result;
             }
         } catch (InterruptedException ex) {
-            pluginMetricNew.incReadFailedNum();
+            GlobalMetrics.incReadFailedNum(groupId);
             Thread.currentThread().interrupt();
         }
         return false;
@@ -91,14 +90,18 @@ public class MemoryChannel implements Channel {
 
     @Override
     public Message pull(long timeout, TimeUnit unit) {
+        String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
         try {
             Message message = queue.poll(timeout, unit);
             if (message != null) {
-                pluginMetricNew.incSendSuccessNum();
+                if (message instanceof ProxyMessage) {
+                    groupId = ((ProxyMessage) message).getInlongGroupId();
+                }
+                GlobalMetrics.incSendSuccessNum(groupId);
             }
             return message;
         } catch (InterruptedException ex) {
-            pluginMetricNew.incSendFailedNum();
+            GlobalMetrics.incSendFailedNum(groupId);
             Thread.currentThread().interrupt();
             throw new IllegalStateException(ex);
         }
@@ -116,10 +119,7 @@ public class MemoryChannel implements Channel {
         if (queue != null) {
             queue.clear();
         }
-        LOGGER.info("destroy channel, memory channel metric, readNum: {}, readSuccessNum: {}, "
-                        + "readFailedNum: {}, sendSuccessNum: {}, sendFailedNum: {}",
-                pluginMetricNew.getReadNum(), pluginMetricNew.getReadSuccessNum(),
-                pluginMetricNew.getReadFailedNum(), pluginMetricNew.getSendSuccessNum(),
-                pluginMetricNew.getSendFailedNum());
+        LOGGER.info("destroy channel, show memory channel metric:");
+        GlobalMetrics.showMemoryChannelStatics();
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java
new file mode 100644
index 000000000..9bd691e85
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/GlobalMetrics.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.metrics;
+
+import org.apache.inlong.agent.utils.ConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class GlobalMetrics {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalMetrics.class);
+
+    // key: groupId_streamId
+    private static final ConcurrentHashMap<String, PluginMetric> pluginMetrics = new ConcurrentHashMap<>();
+    // key: sourceType_groupId_streamId
+    private static final ConcurrentHashMap<String, SourceMetric> sourceMetrics = new ConcurrentHashMap<>();
+    // key: sinkType_groupId_streamId
+    private static final ConcurrentHashMap<String, SinkMetric> sinkMetrics = new ConcurrentHashMap<>();
+
+    private static PluginMetric getPluginMetric(String tagName) {
+        return pluginMetrics.computeIfAbsent(tagName, (key) -> addPluginMetric(tagName));
+    }
+
+    private static PluginMetric addPluginMetric(String tagName) {
+        PluginMetric metric;
+        if (ConfigUtil.isPrometheusEnabled()) {
+            metric = new PluginPrometheusMetric(tagName);
+        } else {
+            metric = new PluginJmxMetric(tagName);
+        }
+        LOGGER.info("add {} pluginMetrics", tagName);
+        return metric;
+    }
+
+    private static SourceMetric getSourceMetric(String tagName) {
+        return sourceMetrics.computeIfAbsent(tagName, (key) -> addSourceMetric(tagName));
+    }
+
+    private static SourceMetric addSourceMetric(String tagName) {
+        SourceMetric metric;
+        if (ConfigUtil.isPrometheusEnabled()) {
+            metric = new SourcePrometheusMetric(tagName);
+        } else {
+            metric = new SourceJmxMetric(tagName);
+        }
+        LOGGER.info("add {} sourceMetric", tagName);
+        return metric;
+    }
+
+    private static SinkMetric getSinkMetric(String tagName) {
+        return sinkMetrics.computeIfAbsent(tagName, (key) -> addSinkMetric(tagName));
+    }
+
+    private static SinkMetric addSinkMetric(String tagName) {
+        SinkMetric metric;
+        if (ConfigUtil.isPrometheusEnabled()) {
+            metric = new SinkPrometheusMetric(tagName);
+        } else {
+            metric = new SinkJmxMetric(tagName);
+        }
+        LOGGER.info("add {} sinkMetric", tagName);
+        return metric;
+    }
+
+    public static void incReadNum(String tagName) {
+        getPluginMetric(tagName).incReadNum();
+    }
+
+    public static long getReadNum(String tagName) {
+        return getPluginMetric(tagName).getReadNum();
+    }
+
+    public static void incSendNum(String tagName) {
+        getPluginMetric(tagName).incSendNum();
+    }
+
+    public static long getSendNum(String tagName) {
+        return getPluginMetric(tagName).getReadNum();
+    }
+
+    public static void incReadFailedNum(String tagName) {
+        getPluginMetric(tagName).incReadFailedNum();
+    }
+
+    public static long getReadFailedNum(String tagName) {
+        return getPluginMetric(tagName).getReadFailedNum();
+    }
+
+    public static void incSendFailedNum(String tagName) {
+        getPluginMetric(tagName).incSendFailedNum();
+    }
+
+    public static long getSendFailedNum(String tagName) {
+        return getPluginMetric(tagName).getSendFailedNum();
+    }
+
+    public static void incReadSuccessNum(String tagName) {
+        getPluginMetric(tagName).incReadSuccessNum();
+    }
+
+    public static long getReadSuccessNum(String tagName) {
+        return getPluginMetric(tagName).getReadSuccessNum();
+    }
+
+    public static void incSendSuccessNum(String tagName) {
+        getPluginMetric(tagName).incSendSuccessNum();
+    }
+
+    public static void incSendSuccessNum(String tagName, int delta) {
+        getPluginMetric(tagName).incSendSuccessNum(delta);
+    }
+
+    public static long getSendSuccessNum(String tagName) {
+        return getPluginMetric(tagName).getSendSuccessNum();
+    }
+
+    public static void incSinkSuccessCount(String tagName) {
+        getSinkMetric(tagName).incSinkSuccessCount();
+    }
+
+    public static long getSinkSuccessCount(String tagName) {
+        return getSinkMetric(tagName).getSinkSuccessCount();
+    }
+
+    public static void incSinkFailCount(String tagName) {
+        getSinkMetric(tagName).incSinkFailCount();
+    }
+
+    public static long getSinkFailCount(String tagName) {
+        return getSinkMetric(tagName).getSinkFailCount();
+    }
+
+    public static void incSourceSuccessCount(String tagName) {
+        getSourceMetric(tagName).incSourceSuccessCount();
+    }
+
+    public static long getSourceSuccessCount(String tagName) {
+        return getSourceMetric(tagName).getSourceSuccessCount();
+    }
+
+    public static void incSourceFailCount(String tagName) {
+        getSourceMetric(tagName).incSourceFailCount();
+    }
+
+    public static void showMemoryChannelStatics() {
+        for (Entry<String, PluginMetric> entry : pluginMetrics.entrySet()) {
+            LOGGER.info("tagName:{} ### readNum: {}, readSuccessNum: {}, readFailedNum: {}, sendSuccessNum: {}, "
+                            + "sendFailedNum: {}", entry.getKey(), entry.getValue().getReadNum(),
+                    entry.getValue().getReadSuccessNum(), entry.getValue().getReadFailedNum(),
+                    entry.getValue().getSendSuccessNum(), entry.getValue().getSendFailedNum());
+        }
+    }
+
+    public long getSourceFailCount(String tagName) {
+        return getSourceMetric(tagName).getSourceFailCount();
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
index 5e75c3a2a..93c7e8e85 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginJmxMetric.java
@@ -19,14 +19,14 @@
 
 package org.apache.inlong.agent.plugin.metrics;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.inlong.agent.metrics.Metric;
 import org.apache.inlong.common.metric.Dimension;
 import org.apache.inlong.common.metric.MetricDomain;
 import org.apache.inlong.common.metric.MetricItem;
 import org.apache.inlong.common.metric.MetricRegister;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * metrics for agent plugin
  */
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
index 7b931712f..2f1f90236 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/PluginPrometheusMetric.java
@@ -23,8 +23,8 @@ import io.prometheus.client.Counter;
 
 public class PluginPrometheusMetric implements PluginMetric {
 
+    // agent-metrics
     public static final String AGENT_PLUGIN_METRICS_PREFIX = "inlong_agent_plugin_";
-
     public static final String READ_NUM_COUNTER_NAME = "read_num_count";
     public static final String SEND_NUM_COUNTER_NAME = "send_num_count";
     public static final String READ_FAILED_NUM_COUNTER_NAME = "read_failed_num_count";
@@ -32,44 +32,40 @@ public class PluginPrometheusMetric implements PluginMetric {
     public static final String READ_SUCCESS_NUM_COUNTER_NAME = "read_success_num_count";
     public static final String SEND_SUCCESS_NUM_COUNTER_NAME = "send_success_num_count";
 
-    private final String tagName;
-
-    private static final Counter READ_NUM_COUNTER = Counter.build()
+    // agent-counters
+    private final Counter readNumCounter = Counter.build()
             .name(AGENT_PLUGIN_METRICS_PREFIX + READ_NUM_COUNTER_NAME)
             .help("The total number of reads.")
             .labelNames("tag")
             .register();
-
-    private static final Counter SEND_NUM_COUNTER = Counter.build()
+    private final Counter sendNumCounter = Counter.build()
             .name(AGENT_PLUGIN_METRICS_PREFIX + SEND_NUM_COUNTER_NAME)
             .help("The total number of sends.")
             .labelNames("tag")
             .register();
-
-    private static final Counter READ_FAILED_NUM_COUNTER = Counter.build()
+    private final Counter readFailedNumCounter = Counter.build()
             .name(AGENT_PLUGIN_METRICS_PREFIX + READ_FAILED_NUM_COUNTER_NAME)
             .help("The total number of failed reads.")
             .labelNames("tag")
             .register();
-
-    private static final Counter SEND_FAILED_NUM_COUNTER = Counter.build()
+    private final Counter sendFailedNumCounter = Counter.build()
             .name(AGENT_PLUGIN_METRICS_PREFIX + SEND_FAILED_NUM_COUNTER_NAME)
             .help("The total number of failed sends.")
             .labelNames("tag")
             .register();
-
-    private static final Counter READ_SUCCESS_NUM_COUNTER = Counter.build()
+    private final Counter readSuccessNumCounter = Counter.build()
             .name(AGENT_PLUGIN_METRICS_PREFIX + READ_SUCCESS_NUM_COUNTER_NAME)
             .help("The total number of successful reads.")
             .labelNames("tag")
             .register();
-
-    private static final Counter SEND_SUCCESS_NUM_COUNTER = Counter.build()
+    private final Counter sendSuccessNumCounter = Counter.build()
             .name(AGENT_PLUGIN_METRICS_PREFIX + SEND_SUCCESS_NUM_COUNTER_NAME)
             .help("The total number of successful sends.")
             .labelNames("tag")
             .register();
 
+    private String tagName;
+
     public PluginPrometheusMetric(String tagName) {
         this.tagName = tagName;
     }
@@ -81,66 +77,66 @@ public class PluginPrometheusMetric implements PluginMetric {
 
     @Override
     public void incReadNum() {
-        READ_NUM_COUNTER.labels(tagName).inc();
+        readNumCounter.labels(tagName).inc();
     }
 
     @Override
     public long getReadNum() {
-        return (long) READ_NUM_COUNTER.labels(tagName).get();
+        return (long) readNumCounter.labels(tagName).get();
     }
 
     @Override
     public void incSendNum() {
-        SEND_NUM_COUNTER.labels(tagName).inc();
+        sendNumCounter.labels(tagName).inc();
     }
 
     @Override
     public long getSendNum() {
-        return (long) SEND_NUM_COUNTER.labels(tagName).get();
+        return (long) sendNumCounter.labels(tagName).get();
     }
 
     @Override
     public void incReadFailedNum() {
-        READ_FAILED_NUM_COUNTER.labels(tagName).inc();
+        readFailedNumCounter.labels(tagName).inc();
     }
 
     @Override
     public long getReadFailedNum() {
-        return (long) READ_FAILED_NUM_COUNTER.labels(tagName).get();
+        return (long) readFailedNumCounter.labels(tagName).get();
     }
 
     @Override
     public void incSendFailedNum() {
-        SEND_FAILED_NUM_COUNTER.labels(tagName).inc();
+        sendFailedNumCounter.labels(tagName).inc();
     }
 
     @Override
     public long getSendFailedNum() {
-        return (long) SEND_FAILED_NUM_COUNTER.labels(tagName).get();
+        return (long) sendFailedNumCounter.labels(tagName).get();
     }
 
     @Override
     public void incReadSuccessNum() {
-        READ_SUCCESS_NUM_COUNTER.labels(tagName).inc();
+        readSuccessNumCounter.labels(tagName).inc();
     }
 
     @Override
     public long getReadSuccessNum() {
-        return (long) READ_SUCCESS_NUM_COUNTER.labels(tagName).get();
+        return (long) readSuccessNumCounter.labels(tagName).get();
     }
 
     @Override
     public void incSendSuccessNum() {
-        SEND_SUCCESS_NUM_COUNTER.labels(tagName).inc();
+        sendSuccessNumCounter.labels(tagName).inc();
     }
 
     @Override
     public void incSendSuccessNum(int delta) {
-        SEND_SUCCESS_NUM_COUNTER.labels(tagName).inc(delta);
+        sendSuccessNumCounter.labels(tagName).inc(delta);
     }
 
     @Override
     public long getSendSuccessNum() {
-        return (long) SEND_SUCCESS_NUM_COUNTER.labels(tagName).get();
+        return (long) sendSuccessNumCounter.labels(tagName).get();
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
index 612eeadd7..b4cf365e0 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkJmxMetric.java
@@ -28,7 +28,7 @@ import org.apache.inlong.common.metric.MetricRegister;
 import java.util.concurrent.atomic.AtomicLong;
 
 @MetricDomain(name = "SinkMetric")
-public class SinkJmxMetric extends MetricItem implements SinkMetrics {
+public class SinkJmxMetric extends MetricItem implements SinkMetric {
 
     @Dimension
     private final String tagName;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
similarity index 97%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
index 7bf325644..a9e6d92cb 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkMetric.java
@@ -19,7 +19,7 @@
 
 package org.apache.inlong.agent.plugin.metrics;
 
-public interface SinkMetrics {
+public interface SinkMetric {
 
     /**
      *  The tag name of sink metrics.
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
similarity index 80%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
index 61482f6ef..16fe4f6d2 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SinkPrometheusMetric.java
@@ -21,7 +21,7 @@ package org.apache.inlong.agent.plugin.metrics;
 
 import io.prometheus.client.Counter;
 
-public class SinkPrometheusMetrics implements SinkMetrics {
+public class SinkPrometheusMetric implements SinkMetric {
 
     public static final String AGENT_SINK_METRICS_PREFIX = "inlong_agent_sink_";
 
@@ -30,19 +30,19 @@ public class SinkPrometheusMetrics implements SinkMetrics {
 
     private final String tagName;
 
-    private static final Counter SINK_SUCCESS_COUNTER = Counter.build()
+    private final Counter sinkSuccessCounter = Counter.build()
             .name(AGENT_SINK_METRICS_PREFIX + SINK_SUCCESS_COUNTER_NAME)
             .help("The success message count in agent sink since agent started.")
             .labelNames("tag")
             .register();
 
-    private static final Counter SINK_FAIL_COUNTER = Counter.build()
+    private final Counter sinkFailCounter = Counter.build()
             .name(AGENT_SINK_METRICS_PREFIX + SINK_FAIL_COUNTER_NAME)
             .help("The failed message count in agent sink since agent started.")
             .labelNames("tag")
             .register();
 
-    public SinkPrometheusMetrics(String tagName) {
+    public SinkPrometheusMetric(String tagName) {
         this.tagName = tagName;
     }
 
@@ -53,21 +53,21 @@ public class SinkPrometheusMetrics implements SinkMetrics {
 
     @Override
     public void incSinkSuccessCount() {
-        SINK_SUCCESS_COUNTER.labels(tagName).inc();
+        sinkSuccessCounter.labels(tagName).inc();
     }
 
     @Override
     public long getSinkSuccessCount() {
-        return (long) SINK_SUCCESS_COUNTER.labels(tagName).get();
+        return (long) sinkSuccessCounter.labels(tagName).get();
     }
 
     @Override
     public void incSinkFailCount() {
-        SINK_FAIL_COUNTER.labels(tagName).inc();
+        sinkFailCounter.labels(tagName).inc();
     }
 
     @Override
     public long getSinkFailCount() {
-        return (long) SINK_FAIL_COUNTER.labels(tagName).get();
+        return (long) sinkFailCounter.labels(tagName).get();
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
index 3ab89452d..2f551f877 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceJmxMetric.java
@@ -28,7 +28,7 @@ import org.apache.inlong.common.metric.MetricRegister;
 import java.util.concurrent.atomic.AtomicLong;
 
 @MetricDomain(name = "SourceMetric")
-public class SourceJmxMetric extends MetricItem implements SourceMetrics {
+public class SourceJmxMetric extends MetricItem implements SourceMetric {
 
     @Dimension
     private final String tagName;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
similarity index 88%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
index e71bda702..47b24721f 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourceMetric.java
@@ -19,10 +19,10 @@
 
 package org.apache.inlong.agent.plugin.metrics;
 
-public interface SourceMetrics {
+public interface SourceMetric {
 
     /**
-     *  The tag name of source metrics.
+     * The tag name of sinksource metrics.
      */
     String getTagName();
 
@@ -32,7 +32,7 @@ public interface SourceMetrics {
     void incSourceSuccessCount();
 
     /**
-     *  Count of the source success metric.
+     * Count of the source success metric.
      */
     long getSourceSuccessCount();
 
@@ -42,7 +42,7 @@ public interface SourceMetrics {
     void incSourceFailCount();
 
     /**
-     *  Count of the source fail metric.
+     * Count of the source fail metric.
      */
     long getSourceFailCount();
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetrics.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
similarity index 77%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetrics.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
index 9fb52d5bd..babad8664 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetrics.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/metrics/SourcePrometheusMetric.java
@@ -19,28 +19,28 @@ package org.apache.inlong.agent.plugin.metrics;
 
 import io.prometheus.client.Counter;
 
-public class SourcePrometheusMetrics implements SourceMetrics {
+public class SourcePrometheusMetric implements SourceMetric {
 
+    // agent-source-metrics
     public static final String AGENT_SOURCE_METRICS_PREFIX = "inlong_agent_source_";
-
     public static final String SOURCE_SUCCESS_COUNTER_NAME = "success_count";
     public static final String SOURCE_FAIL_COUNTER_NAME = "fail_count";
 
-    private final String tagName;
-
-    private static final Counter SOURCE_SUCCESS_COUNTER = Counter.build()
+    // agent-source-counters
+    private final Counter sourceSuccessCounter = Counter.build()
             .name(AGENT_SOURCE_METRICS_PREFIX + SOURCE_SUCCESS_COUNTER_NAME)
             .help("The success message count in agent source since agent started.")
             .labelNames("tag")
             .register();
-
-    private static final Counter SOURCE_FAIL_COUNTER = Counter.build()
+    private final Counter sourceFailCounter = Counter.build()
             .name(AGENT_SOURCE_METRICS_PREFIX + SOURCE_FAIL_COUNTER_NAME)
             .help("The failed message count in agent source since agent started.")
             .labelNames("tag")
             .register();
 
-    public SourcePrometheusMetrics(String tagName) {
+    private String tagName;
+
+    public SourcePrometheusMetric(String tagName) {
         this.tagName = tagName;
     }
 
@@ -51,21 +51,21 @@ public class SourcePrometheusMetrics implements SourceMetrics {
 
     @Override
     public void incSourceSuccessCount() {
-        SOURCE_SUCCESS_COUNTER.labels(tagName).inc();
+        sourceSuccessCounter.labels(tagName).inc();
     }
 
     @Override
     public long getSourceSuccessCount() {
-        return (long) SOURCE_SUCCESS_COUNTER.labels(tagName).get();
+        return (long) sourceSuccessCounter.labels(tagName).get();
     }
 
     @Override
     public void incSourceFailCount() {
-        SOURCE_FAIL_COUNTER.labels(tagName).inc();
+        sourceFailCounter.labels(tagName).inc();
     }
 
     @Override
     public long getSourceFailCount() {
-        return (long) SOURCE_FAIL_COUNTER.labels(tagName).get();
+        return (long) sourceFailCounter.labels(tagName).get();
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
index 1d1035a2f..d62ca2d4f 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
@@ -17,14 +17,9 @@
 
 package org.apache.inlong.agent.plugin.sinks;
 
-import com.google.common.base.Joiner;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.Sink;
-import org.apache.inlong.agent.plugin.metrics.SinkJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SinkMetrics;
-import org.apache.inlong.agent.plugin.metrics.SinkPrometheusMetrics;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,12 +38,10 @@ public abstract class AbstractSink implements Sink {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSink.class);
 
-    protected static SinkMetrics sinkMetric;
-
-    protected static SinkMetrics streamMetric;
     private static AtomicLong index = new AtomicLong(0);
     protected String inlongGroupId;
     protected String inlongStreamId;
+    protected String metricTagName;
 
     @Override
     public MessageFilter initMessageFilter(JobProfile jobConf) {
@@ -69,24 +62,4 @@ public abstract class AbstractSink implements Sink {
         inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
     }
 
-    /**
-     * init sinkMetric
-     *
-     * @param tagName metric tagName
-     */
-    protected void intMetric(String tagName) {
-        String label = Joiner.on(",").join(tagName, index.getAndIncrement());
-        if (ConfigUtil.isPrometheusEnabled()) {
-            sinkMetric = new SinkPrometheusMetrics(label);
-        } else {
-            sinkMetric = new SinkJmxMetric(label);
-        }
-        label = Joiner.on(",").join(tagName, inlongGroupId, inlongStreamId);
-        if (ConfigUtil.isPrometheusEnabled()) {
-            streamMetric = new SinkPrometheusMetrics(label);
-        } else {
-            streamMetric = new SinkJmxMetric(label);
-        }
-    }
-
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
index 1471fd677..b6a14a1f3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ConsoleSink.java
@@ -17,10 +17,12 @@
 
 package org.apache.inlong.agent.plugin.sinks;
 
-import java.nio.charset.StandardCharsets;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
+
+import java.nio.charset.StandardCharsets;
 
 /**
  * message write to console
@@ -38,12 +40,10 @@ public class ConsoleSink extends AbstractSink {
         if (message != null) {
             System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
             // increment the count of successful sinks
-            sinkMetric.incSinkSuccessCount();
-            streamMetric.incSinkSuccessCount();
+            GlobalMetrics.incSinkSuccessCount(metricTagName);
         } else {
             // increment the count of failed sinks
-            sinkMetric.incSinkFailCount();
-            streamMetric.incSinkFailCount();
+            GlobalMetrics.incSinkFailCount(metricTagName);
         }
     }
 
@@ -60,7 +60,7 @@ public class ConsoleSink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        intMetric(CONSOLE_SINK_TAG_NAME);
+        metricTagName = CONSOLE_SINK_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
     }
 
     @Override
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 97fa7d3d8..b97116ad3 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -27,6 +27,7 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.message.PackProxyMessage;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.slf4j.Logger;
@@ -111,12 +112,10 @@ public class ProxySink extends AbstractSink {
                     AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
                             inlongGroupId, inlongStreamId, System.currentTimeMillis());
                     // increment the count of successful sinks
-                    sinkMetric.incSinkSuccessCount();
-                    streamMetric.incSinkSuccessCount();
+                    GlobalMetrics.incSinkSuccessCount(metricTagName);
                 } else {
                     // increment the count of failed sinks
-                    sinkMetric.incSinkFailCount();
-                    streamMetric.incSinkFailCount();
+                    GlobalMetrics.incSinkFailCount(metricTagName);
                 }
             }
         } catch (Exception e) {
@@ -183,7 +182,7 @@ public class ProxySink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        intMetric(PROXY_SINK_TAG_NAME);
+        metricTagName = PROXY_SINK_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
         syncSend = jobConf.getBoolean(PROXY_SEND_SYNC, false);
         maxBatchSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE);
         maxQueueNumber = jobConf.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index c0183afb6..86ab06bb8 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -23,11 +23,8 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.core.task.TaskPositionManager;
 import org.apache.inlong.agent.plugin.message.SequentialID;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
@@ -79,11 +76,9 @@ public class SenderManager {
     private final int maxSenderRetry;
     private final long retrySleepTime;
     private final String inlongGroupId;
-    private TaskPositionManager taskPositionManager;
     private final int maxSenderPerGroup;
     private final String sourcePath;
-    private final PluginMetric metric;
-
+    private TaskPositionManager taskPositionManager;
     private int ioThreadNum;
     private boolean enableBusyWait;
     private Semaphore semaphore;
@@ -125,12 +120,6 @@ public class SenderManager {
 
         this.sourcePath = sourcePath;
         this.inlongGroupId = inlongGroupId;
-
-        if (ConfigUtil.isPrometheusEnabled()) {
-            this.metric = new PluginPrometheusMetric(SENDER_MANAGER_TAG_NAME);
-        } else {
-            this.metric = new PluginJmxMetric(SENDER_MANAGER_TAG_NAME);
-        }
     }
 
     /**
@@ -156,13 +145,13 @@ public class SenderManager {
     /**
      * sender
      *
-     * @param groupId group id
+     * @param tagName group id
      * @return DefaultMessageSender
      */
-    private DefaultMessageSender createMessageSender(String groupId) throws Exception {
+    private DefaultMessageSender createMessageSender(String tagName) throws Exception {
 
         ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
-                localhost, isLocalVisit, managerHost, managerPort, groupId, netTag);
+                localhost, isLocalVisit, managerHost, managerPort, tagName, netTag);
         proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
         proxyClientConfig.setFile(isFile);
         proxyClientConfig.setAliveConnections(aliveConnectionNum);
@@ -192,50 +181,6 @@ public class SenderManager {
         senderList.add(sender);
     }
 
-    /**
-     * sender callback
-     */
-    private class AgentSenderCallback implements SendMessageCallback {
-
-        private final int retry;
-        private final String groupId;
-        private final List<byte[]> bodyList;
-        private final String streamId;
-        private final long dataTime;
-        private final String jobId;
-
-        AgentSenderCallback(String jobId, String groupId, String streamId, List<byte[]> bodyList, int retry,
-                long dataTime) {
-            this.retry = retry;
-            this.groupId = groupId;
-            this.streamId = streamId;
-            this.bodyList = bodyList;
-            this.jobId = jobId;
-            this.dataTime = dataTime;
-        }
-
-        @Override
-        public void onMessageAck(SendResult result) {
-            // if send result is not ok, retry again.
-            if (result == null || !result.equals(SendResult.OK)) {
-                LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime {} fail with times {}, "
-                        + "error {}", groupId, streamId, jobId, dataTime, retry, result);
-                sendBatchAsync(jobId, groupId, streamId, bodyList, retry + 1, dataTime);
-                return;
-            }
-            semaphore.release(bodyList.size());
-            metric.incSendSuccessNum(bodyList.size());
-            if (sourcePath != null) {
-                taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
-            }
-        }
-
-        @Override
-        public void onException(Throwable e) {
-            LOGGER.error("exception caught", e);
-        }
-    }
-
     /**
      * Send message to proxy by batch, use message cache.
      *
@@ -303,4 +248,48 @@ public class SenderManager {
         }
     }
 
+    /**
+     * sender callback
+     */
+    private class AgentSenderCallback implements SendMessageCallback {
+
+        private final int retry;
+        private final String groupId;
+        private final List<byte[]> bodyList;
+        private final String streamId;
+        private final long dataTime;
+        private final String jobId;
+
+        AgentSenderCallback(String jobId, String groupId, String streamId, List<byte[]> bodyList, int retry,
+                long dataTime) {
+            this.retry = retry;
+            this.groupId = groupId;
+            this.streamId = streamId;
+            this.bodyList = bodyList;
+            this.jobId = jobId;
+            this.dataTime = dataTime;
+        }
+
+        @Override
+        public void onMessageAck(SendResult result) {
+            // if send result is not ok, retry again.
+            if (result == null || !result.equals(SendResult.OK)) {
+                LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime {} fail with times {}, "
+                        + "error {}", groupId, streamId, jobId, dataTime, retry, result);
+                sendBatchAsync(jobId, groupId, streamId, bodyList, retry + 1, dataTime);
+                return;
+            }
+            semaphore.release(bodyList.size());
+            GlobalMetrics.incSendSuccessNum(groupId + "_" + streamId, bodyList.size());
+            if (sourcePath != null) {
+                taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            LOGGER.error("exception caught", e);
+        }
+    }
+
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
index f13e1f49c..dfb0aba07 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/BinlogSource.java
@@ -20,18 +20,18 @@ package org.apache.inlong.agent.plugin.sources;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.plugin.sources.reader.BinlogReader;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 
 /**
  * binlog source, split binlog source job into multi readers
@@ -40,25 +40,19 @@ public class BinlogSource implements Source {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BinlogSource.class);
     private static final String BINLOG_SOURCE_TAG_NAME = "BinlogSourceMetric";
-    private static AtomicLong metricsIndex = new AtomicLong(0);
-    private final SourceMetrics sourceMetrics;
 
     public BinlogSource() {
-        if (ConfigUtil.isPrometheusEnabled()) {
-            this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
-                    BINLOG_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
-        } else {
-            this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
-                    BINLOG_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
-        }
     }
 
     @Override
     public List<Reader> split(JobProfile conf) {
+        String inlongGroupId = conf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+        String inlongStreamId = conf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+        String metricTagName = BINLOG_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
         Reader binlogReader = new BinlogReader();
         List<Reader> readerList = new ArrayList<>();
         readerList.add(binlogReader);
-        sourceMetrics.incSourceSuccessCount();
+        GlobalMetrics.incSourceSuccessCount(metricTagName);
         return readerList;
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
index 1a0bcbda9..ee13ddb71 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/DatabaseSqlSource.java
@@ -17,43 +17,38 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.plugin.sources.reader.SqlReader;
 import org.apache.inlong.agent.utils.AgentDbUtils;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+
 /**
  * Make database as Source
  */
-public class DatabaseSqlSource  implements Source {
+public class DatabaseSqlSource implements Source {
+
     private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseSqlSource.class);
 
     private static final String JOB_DATABASE_SQL = "job.sql.command";
 
     private static final String DATABASE_SOURCE_TAG_NAME = "AgentDatabaseSourceMetric";
 
-    private final SourceMetrics sourceMetrics;
     private static AtomicLong metricsIndex = new AtomicLong(0);
 
     public DatabaseSqlSource() {
-        if (ConfigUtil.isPrometheusEnabled()) {
-            this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
-                DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
-        } else {
-            this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
-                DATABASE_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
-        }
     }
 
     /**
@@ -81,6 +76,9 @@ public class DatabaseSqlSource  implements Source {
      */
     @Override
     public List<Reader> split(JobProfile conf) {
+        String inlongGroupId = conf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+        String inlongStreamId = conf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+        String metricTagName = DATABASE_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
         String sqlPattern = conf.get(JOB_DATABASE_SQL, "").toLowerCase();
         List<Reader> readerList = null;
         if (!sqlPattern.isEmpty()) {
@@ -88,11 +86,11 @@ public class DatabaseSqlSource  implements Source {
         }
         if (readerList != null) {
             // increment the count of successful sources
-            sourceMetrics.incSourceSuccessCount();
+            GlobalMetrics.incSourceSuccessCount(metricTagName);
         } else {
             // database type or sql is incorrect
             // increment the count of failed sources
-            sourceMetrics.incSourceFailCount();
+            GlobalMetrics.incSourceFailCount(metricTagName);
         }
         return readerList;
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index 87af9c4e6..a817c4e12 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -23,12 +23,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.plugin.sources.reader.KafkaReader;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -42,6 +38,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET;
@@ -71,21 +71,15 @@ public class KafkaSource implements Source {
     private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
     private static final Gson gson = new Gson();
     private static AtomicLong metricsIndex = new AtomicLong(0);
-    private final SourceMetrics sourceMetrics;
 
     public KafkaSource() {
-        if (ConfigUtil.isPrometheusEnabled()) {
-            this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
-                    KAFKA_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
-        } else {
-            this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
-                    KAFKA_SOURCE_TAG_NAME, metricsIndex.incrementAndGet()));
-        }
-
     }
 
     @Override
     public List<Reader> split(JobProfile conf) {
+        String inlongGroupId = conf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+        String inlongStreamId = conf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+        String metricTagName = KAFKA_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
         List<Reader> result = new ArrayList<>();
         String filterPattern = conf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
 
@@ -141,6 +135,9 @@ public class KafkaSource implements Source {
                 addValidator(filterPattern, kafkaReader);
                 result.add(kafkaReader);
             }
+            GlobalMetrics.incSourceSuccessCount(metricTagName);
+        } else {
+            GlobalMetrics.incSourceFailCount(metricTagName);
         }
         return result;
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 9baeb352c..d1258685e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -20,13 +20,9 @@ package org.apache.inlong.agent.plugin.sources;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.metrics.SourceJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SourceMetrics;
-import org.apache.inlong.agent.plugin.metrics.SourcePrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
 import org.apache.inlong.agent.plugin.utils.PluginUtils;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +32,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
 import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
@@ -47,28 +47,20 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOU
  */
 public class TextFileSource implements Source {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(TextFileSource.class);
-
-    private static final String TEXT_FILE_SOURCE_TAG_NAME = "AgentTextFileSourceMetric";
-
     // path + suffix
     public static final String MD5_SUFFIX = ".md5";
-
-    private final SourceMetrics sourceMetrics;
+    private static final Logger LOGGER = LoggerFactory.getLogger(TextFileSource.class);
+    private static final String TEXT_FILE_SOURCE_TAG_NAME = "AgentTextFileSourceMetric";
     private static AtomicLong metricsIndex = new AtomicLong(0);
 
     public TextFileSource() {
-        if (ConfigUtil.isPrometheusEnabled()) {
-            this.sourceMetrics = new SourcePrometheusMetrics(AgentUtils.getUniqId(
-                    TEXT_FILE_SOURCE_TAG_NAME,  metricsIndex.incrementAndGet()));
-        } else {
-            this.sourceMetrics = new SourceJmxMetric(AgentUtils.getUniqId(
-                    TEXT_FILE_SOURCE_TAG_NAME,  metricsIndex.incrementAndGet()));
-        }
     }
 
     @Override
     public List<Reader> split(JobProfile jobConf) {
+        String inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+        String inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+        String metricTagName = TEXT_FILE_SOURCE_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
         Collection<File> allFiles = PluginUtils.findSuitFiles(jobConf);
         List<Reader> result = new ArrayList<>();
         String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
@@ -84,7 +76,7 @@ public class TextFileSource implements Source {
             result.add(textFileReader);
         }
         // increment the count of successful sources
-        sourceMetrics.incSourceSuccessCount();
+        GlobalMetrics.incSourceSuccessCount(metricTagName);
         return result;
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index 7f5566d0e..e9ed0ac62 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -17,35 +17,23 @@
 
 package org.apache.inlong.agent.plugin.sources.reader;
 
-import com.google.common.base.Joiner;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.plugin.Reader;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
-import org.apache.inlong.agent.utils.ConfigUtil;
-
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
 import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.KEY_METRICS_INDEX;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
 
 /**
  * abstract reader, init reader and reader metrics
  */
 public abstract class AbstractReader implements Reader {
 
-    protected static PluginMetric readerMetric;
-    protected static PluginMetric streamMetric;
-    private static AtomicLong metricsIndex = new AtomicLong(0);
     protected String inlongGroupId;
 
     protected String inlongStreamId;
+    protected String metricTagName;
 
     @Override
     public void init(JobProfile jobConf) {
@@ -53,30 +41,7 @@ public abstract class AbstractReader implements Reader {
         inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
     }
 
-    /**
-     * init reader metrics
-     *
-     * @param tagName metric tagName
-     */
-    protected void intMetric(String tagName) {
-        String metricsIndexValue = String.valueOf(metricsIndex.getAndIncrement());
-        String label = Joiner.on(",").join(tagName, metricsIndexValue);
-        String groupIdKV = PROXY_KEY_GROUP_ID + "=" + inlongGroupId;
-        String streamIdKV = PROXY_KEY_STREAM_ID + "=" + inlongStreamId;
-        String metricsIndexKV = KEY_METRICS_INDEX + "=" + metricsIndexValue;
-        if (ConfigUtil.isPrometheusEnabled()) {
-            readerMetric = new PluginPrometheusMetric(label);
-        } else {
-            label = Joiner.on(",").join(tagName, metricsIndexKV);
-            readerMetric = new PluginJmxMetric(label);
-        }
-        label = Joiner.on(",").join(tagName, inlongGroupId, inlongStreamId);
-        if (ConfigUtil.isPrometheusEnabled()) {
-            streamMetric = new PluginPrometheusMetric(label);
-        } else {
-            label = Joiner.on(",").join(tagName, groupIdKV, streamIdKV);
-            streamMetric = new PluginJmxMetric(label);
-        }
+    public String getInlongGroupId() {
+        return inlongGroupId;
     }
-
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 9ca78b5f7..7b7961a48 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -31,7 +31,7 @@ import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.constant.SnapshotModeConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
 import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore;
@@ -60,7 +60,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
 /**
  * read binlog data
  */
-public class BinlogReader implements Reader {
+public class BinlogReader extends AbstractReader {
 
     public static final String COMPONENT_NAME = "BinlogReader";
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
@@ -81,6 +81,7 @@ public class BinlogReader implements Reader {
     public static final String JOB_DATABASE_QUEUE_SIZE = "job.binlogJob.queueSize";
     private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
     private static final Gson gson = new Gson();
+    private static final String BINLOG_READER_TAG_NAME = "AgentBinlogMetric";
     private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
     /**
      * pair.left: table name
@@ -120,6 +121,7 @@ public class BinlogReader implements Reader {
     @Override
     public Message read() {
         if (!binlogMessagesQueue.isEmpty()) {
+            GlobalMetrics.incReadNum(metricTagName);
             return getBinlogMessage();
         } else {
             return null;
@@ -171,6 +173,7 @@ public class BinlogReader implements Reader {
                 CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID);
         inlongStreamId = jobConf.get(CommonConstants.PROXY_INLONG_STREAM_ID,
                 CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID);
+        metricTagName = BINLOG_READER_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
 
         if (enableReportConfigLog) {
             String reportConfigServerUrl = jobConf
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index fe28c79a6..93910e99a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -23,14 +23,10 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginMetric;
-import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.plugin.validator.PatternValidator;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -48,10 +44,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_BYTE_SPEED_LIMIT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_OFFSET;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
@@ -61,7 +53,7 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_TOPIC;
 /**
  * read kafka data
  */
-public class KafkaReader<K, V> implements Reader {
+public class KafkaReader<K, V> extends AbstractReader {
 
     public static final int NEVER_STOP_SIGN = -1;
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReader.class);
@@ -77,7 +69,6 @@ public class KafkaReader<K, V> implements Reader {
     /* total readBytes */
     private static AtomicLong currentTotalReadBytes = new AtomicLong(0);
     private static AtomicLong lastTotalReadBytes = new AtomicLong(0);
-    private final PluginMetric kafkaMetric;
     KafkaConsumer<K, V> consumer;
     long lastTimestamp;
     /* bps: records/s */
@@ -103,18 +94,9 @@ public class KafkaReader<K, V> implements Reader {
      */
     public KafkaReader(KafkaConsumer<K, V> consumer, Map<String, String> paraMap) {
         this.consumer = consumer;
-        // metrics total readRecords
-        if (ConfigUtil.isPrometheusEnabled()) {
-            kafkaMetric = new PluginPrometheusMetric(AgentUtils.getUniqId(
-                    KAFKA_READER_TAG_NAME, currentTotalReadRecords.incrementAndGet()));
-        } else {
-            kafkaMetric = new PluginJmxMetric(AgentUtils.getUniqId(
-                    KAFKA_READER_TAG_NAME, currentTotalReadRecords.incrementAndGet()));
-        }
-
-        this.recordSpeed = Long.valueOf(paraMap.getOrDefault(JOB_KAFKA_RECORD_SPEED_LIMIT, "10000"));
-        this.byteSpeed = Long.valueOf(paraMap.getOrDefault(JOB_KAFKA_BYTE_SPEED_LIMIT, String.valueOf(1024 * 1024)));
-        this.flowControlInterval = Long.valueOf(paraMap.getOrDefault(KAFKA_SOURCE_READ_MIN_INTERVAL, "1000"));
+        this.recordSpeed = Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_RECORD_SPEED_LIMIT, "10000"));
+        this.byteSpeed = Long.parseLong(paraMap.getOrDefault(JOB_KAFKA_BYTE_SPEED_LIMIT, String.valueOf(1024 * 1024)));
+        this.flowControlInterval = Long.parseLong(paraMap.getOrDefault(KAFKA_SOURCE_READ_MIN_INTERVAL, "1000"));
         this.lastTimestamp = System.currentTimeMillis();
         this.topic = paraMap.get(JOB_KAFKA_TOPIC);
 
@@ -140,7 +122,7 @@ public class KafkaReader<K, V> implements Reader {
                         "partition:" + record.partition()
                                 + ", value:" + new String(recordValue) + ", offset:" + record.offset());
                 // control speed
-                kafkaMetric.incReadNum();
+                GlobalMetrics.incReadNum(metricTagName);
                 // commit succeed,then record current offset
                 snapshot = record.partition() + JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
                 DefaultMessage message = new DefaultMessage(recordValue, headerMap);
@@ -188,13 +170,13 @@ public class KafkaReader<K, V> implements Reader {
 
     @Override
     public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        metricTagName = KAFKA_READER_TAG_NAME + "_" + inlongGroupId;
         // get offset from jobConf
         snapshot = jobConf.get(JOB_KAFKA_OFFSET, null);
         initReadTimeout(jobConf);
         // fetch data
         fetchData(5000);
-        inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
-        inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
     }
 
     @Override
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index a27a50522..33f4f04a9 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -17,21 +17,6 @@
 
 package org.apache.inlong.agent.plugin.sources.reader;
 
-import static java.sql.Types.BINARY;
-import static java.sql.Types.BLOB;
-import static java.sql.Types.LONGVARBINARY;
-import static java.sql.Types.VARBINARY;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
-
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.CharUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -39,11 +24,25 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.utils.AgentDbUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BLOB;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.VARBINARY;
+
 /**
  * Read data from database by SQL
  */
@@ -119,16 +118,14 @@ public class SqlReader extends AbstractReader {
                 }
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
                         inlongGroupId, inlongStreamId, System.currentTimeMillis());
-                readerMetric.incReadNum();
-                streamMetric.incReadNum();
+                GlobalMetrics.incReadNum(metricTagName);
                 return generateMessage(lineColumns);
             } else {
                 finished = true;
             }
         } catch (Exception ex) {
             LOGGER.error("error while reading data", ex);
-            readerMetric.incReadFailedNum();
-            streamMetric.incReadFailedNum();
+            GlobalMetrics.incReadFailedNum(metricTagName);
             throw new RuntimeException(ex);
         }
         return null;
@@ -193,9 +190,8 @@ public class SqlReader extends AbstractReader {
 
     @Override
     public void init(JobProfile jobConf) {
-        inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
-        inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
-        intMetric(SQL_READER_TAG_NAME);
+        super.init(jobConf);
+        metricTagName = SQL_READER_TAG_NAME + "_" + inlongGroupId + "_" + inlongStreamId;
         int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
         String userName = jobConf.get(JOB_DATABASE_USER);
         String password = jobConf.get(JOB_DATABASE_PASSWORD);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index a627e7c98..7ec8e8ad1 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Validator;
 import org.apache.inlong.agent.plugin.except.FileException;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.plugin.validator.PatternValidator;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
@@ -88,10 +89,7 @@ public class TextFileReader extends AbstractReader {
             if (validateMessage(message)) {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
                         inlongGroupId, inlongStreamId, System.currentTimeMillis());
-                if (streamMetric != null) {
-                    readerMetric.incReadNum();
-                    streamMetric.incReadNum();
-                }
+                GlobalMetrics.incReadNum(metricTagName);
                 return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8));
             }
         }
@@ -166,7 +164,7 @@ public class TextFileReader extends AbstractReader {
     public void init(JobProfile jobConf) {
         try {
             super.init(jobConf);
-            intMetric(TEXT_FILE_READER_TAG_NAME);
+            metricTagName = TEXT_FILE_READER_TAG_NAME + "_" + inlongGroupId;
             initReadTimeout(jobConf);
             String md5 = AgentUtils.getFileMd5(file);
             if (StringUtils.isNotBlank(this.md5) && !this.md5.equals(md5)) {
@@ -197,6 +195,6 @@ public class TextFileReader extends AbstractReader {
         }
         AgentUtils.finallyClose(stream);
         LOGGER.info("destroy reader with read {} num {}",
-                streamMetric.getTagName(), streamMetric.getReadNum());
+                metricTagName, GlobalMetrics.getReadNum(metricTagName));
     }
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
new file mode 100644
index 000000000..2fa69a7ed
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/metrics/GlobalMetricsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.metrics;
+
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class GlobalMetricsTest {
+
+    private String groupId1 = "groupId_test1";
+    private String groupId2 = "groupId_test2";
+    private String streamId = "streamId";
+    private String sinkTag = "File_Sink";
+    private String sourceTag = "File_Source";
+
+    @Test
+    public void testPluginMetric() {
+        String tag1 = groupId1 + "_" + streamId;
+        String tag2 = groupId2 + "_" + streamId;
+        GlobalMetrics.incReadNum(tag1);
+        assertEquals(GlobalMetrics.getReadNum(tag1), 1);
+        assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 0);
+        GlobalMetrics.incSendSuccessNum(tag2, 10);
+        assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 10);
+        GlobalMetrics.incSendSuccessNum(tag2);
+        assertEquals(GlobalMetrics.getSendSuccessNum(tag2), 11);
+    }
+
+    @Test
+    public void testSinkMetric() {
+        String tag = sinkTag + "_" + groupId1 + "_" + streamId;
+        GlobalMetrics.incSinkFailCount(tag);
+        assertEquals(GlobalMetrics.getSinkFailCount(tag), 1);
+    }
+
+    @Test
+    public void testSourceMetric() {
+        String tag1 = sourceTag + "_" + groupId1 + "_" + streamId;
+        GlobalMetrics.incSourceSuccessCount(tag1);
+        GlobalMetrics.incSourceSuccessCount(tag1);
+        assertEquals(GlobalMetrics.getSourceSuccessCount(tag1), 2);
+
+        String tag2 = sourceTag + "_" + groupId2 + "_" + streamId;
+        assertEquals(GlobalMetrics.getSourceSuccessCount(tag2), 0);
+        GlobalMetrics.incSourceSuccessCount(tag2);
+        assertEquals(GlobalMetrics.getSourceSuccessCount(tag2), 1);
+
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index 52f4bd09c..0ce57bccc 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -17,47 +17,35 @@
 
 package org.apache.inlong.agent.plugin.sinks;
 
-import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_DATA_TIME;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.core.task.TaskPositionManager;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.Sink;
-import org.apache.inlong.agent.plugin.metrics.SinkJmxMetric;
-import org.apache.inlong.agent.plugin.metrics.SinkMetrics;
-import org.apache.inlong.agent.plugin.metrics.SinkPrometheusMetrics;
+import org.apache.inlong.agent.plugin.metrics.GlobalMetrics;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MockSink implements Sink {
+import java.util.concurrent.atomic.AtomicLong;
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(MockSink.class);
+import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_DATA_TIME;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
 
-    public static final String MOCK_SINK_TAG_NAME = "AgentMockSinkMetric";
+public class MockSink implements Sink {
 
+    public static final String MOCK_SINK_TAG_NAME = "AgentMockSinkMetric";
+    private static final Logger LOGGER = LoggerFactory.getLogger(MockSink.class);
     private final AtomicLong number = new AtomicLong(0);
+    public String tagName = MOCK_SINK_TAG_NAME + "_" + "groupIdTest" + "_" + "streamId";
     private TaskPositionManager taskPositionManager;
     private String sourceFileName;
     private String jobInstanceId;
     private long dataTime;
 
-    private final SinkMetrics sinkMetrics;
-    private static AtomicLong metricsIndex = new AtomicLong(0);
-
     public MockSink() {
-        if (ConfigUtil.isPrometheusEnabled()) {
-            this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(
-                    MOCK_SINK_TAG_NAME,  metricsIndex.incrementAndGet()));
-        } else {
-            this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(
-                    MOCK_SINK_TAG_NAME,  metricsIndex.incrementAndGet()));
-        }
+
     }
 
     @Override
@@ -66,10 +54,10 @@ public class MockSink implements Sink {
             number.incrementAndGet();
             taskPositionManager.updateSinkPosition(jobInstanceId, sourceFileName, 1);
             // increment the count of successful sinks
-            sinkMetrics.incSinkSuccessCount();
+            GlobalMetrics.incSinkSuccessCount(tagName);
         } else {
             // increment the count of failed sinks
-            sinkMetrics.incSinkFailCount();
+            GlobalMetrics.incSinkFailCount(tagName);
         }
     }
 
@@ -88,7 +76,7 @@ public class MockSink implements Sink {
         taskPositionManager = TaskPositionManager.getTaskPositionManager();
         jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
         dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
-            jobConf.get(JOB_CYCLE_UNIT, ""));
+                jobConf.get(JOB_CYCLE_UNIT, ""));
         LOGGER.info("get dataTime is : {}", dataTime);
     }