You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/19 03:07:52 UTC

[inlong] branch master updated: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 59352497b [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)
59352497b is described below

commit 59352497b14fd0dd835a2038b699adc5266cf590
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Sep 19 11:07:46 2022 +0800

    [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)
---
 .../inlong/dataproxy/consts/ConfigConstants.java   |   1 +
 .../inlong/dataproxy/http/HttpBaseSource.java      |   9 +-
 .../dataproxy/http/SimpleMessageHandler.java       |  35 ++---
 .../dataproxy/metrics/DataProxyMetricItemSet.java  | 128 +++++++++++++++
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 174 ++++++++++-----------
 .../org/apache/inlong/dataproxy/sink/TubeSink.java | 121 +++++---------
 .../dataproxy/sink/pulsar/PulsarClientService.java |   8 +-
 .../dataproxy/sink/pulsar/SendMessageCallBack.java |   2 +-
 .../apache/inlong/dataproxy/source/BaseSource.java |  24 ++-
 .../dataproxy/source/ServerMessageHandler.java     |  65 +++-----
 .../dataproxy/source/SimpleMessageHandler.java     |  11 +-
 .../inlong/dataproxy/source/SimpleTcpSource.java   |  14 --
 .../inlong/dataproxy/source/SimpleUdpSource.java   |   1 +
 13 files changed, 320 insertions(+), 273 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 1a6fcada1..ae6bc380f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -96,6 +96,7 @@ public class ConfigConstants {
     public static final String CLUSTER_ID_KEY = "clusterId";
     public static final String MANAGER_HOST = "manager.hosts";
     public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
+    public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy";
     public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
     public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
     public static final String PROXY_REPORT_IP = "proxy.report.ip";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 84e45e2fd..973bc128c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -29,6 +29,7 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
+import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
@@ -77,7 +78,13 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource,
                     statIntervalSec, maxMonitorCnt);
         }
         // register metrics
-        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+        ConfigManager configManager = ConfigManager.getInstance();
+        String clusterId =
+                configManager.getCommonProperties().getOrDefault(
+                        ConfigConstants.PROXY_CLUSTER_NAME,
+                        ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+        this.metricItemSet =
+                new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
         MetricRegister.register(metricItemSet);
         super.start();
         logger.info("{} started!", this.getName());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 16bb5c917..c5ca7c1d7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -36,7 +36,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.ServiceDecoder;
@@ -175,14 +174,14 @@ public class SimpleMessageHandler implements MessageHandler {
                         intMsgCnt, 1, data.length, 0);
                 monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
             }
-            addMetric(true, data.length, event);
+            addStatistics(true, data.length, event);
         } catch (ChannelException ex) {
             if (monitorIndex != null) {
                 monitorIndex.addAndGet(strBuff.toString(),
                         0, 0, 0, intMsgCnt);
                 monitorIndexExt.incrementAndGet("EVENT_DROPPED");
             }
-            addMetric(false, data.length, event);
+            addStatistics(false, data.length, event);
             logCounter++;
             if (logCounter == 1 || logCounter % 1000 == 0) {
                 LOG.error("Error writing to channel, and will retry after 1s, ex={},"
@@ -223,31 +222,19 @@ public class SimpleMessageHandler implements MessageHandler {
     }
 
     /**
-     * add audit metric
+     * add statistics information
      *
-     * @param result  success or failure
+     * @param isSuccess  success or failure
      * @param size    message size
      * @param event   message event
      */
-    private void addMetric(boolean result, long size, Event event) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, this.metricItemSet.getName());
-        dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, this.metricItemSet.getName());
-        DataProxyMetricItem.fillInlongId(event, dimensions);
-        DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
-        DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-        if (result) {
-            metricItem.readSuccessCount.incrementAndGet();
-            metricItem.readSuccessSize.addAndGet(size);
-            try {
-                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
-            } catch (Exception e) {
-                LOG.error("add audit metric has exception e= {}", e);
-            }
-        } else {
-            metricItem.readFailCount.incrementAndGet();
-            metricItem.readFailSize.addAndGet(size);
+    private void addStatistics(boolean isSuccess, long size, Event event) {
+        if (event == null) {
+            return;
+        }
+        metricItemSet.fillSrcMetricItemsByEvent(event, isSuccess, size);
+        if (isSuccess) {
+            AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
         }
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index 811442428..e420e03a2 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -17,8 +17,15 @@
 
 package org.apache.inlong.dataproxy.metrics;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.flume.Event;
 import org.apache.inlong.common.metric.MetricDomain;
 import org.apache.inlong.common.metric.MetricItemSet;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
 
 /**
  * 
@@ -26,6 +33,8 @@ import org.apache.inlong.common.metric.MetricItemSet;
  */
 @MetricDomain(name = "DataProxy")
 public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+    private String clusterId = null;
+    private String sourceDataId = null;
 
     /**
      * Constructor
@@ -36,6 +45,125 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
         super(name);
     }
 
+    /**
+     * Constructor
+     *
+     * @param clusterId  the cluster id
+     * @param name    the module name
+     */
+    public DataProxyMetricItemSet(String clusterId, String name) {
+        super(name);
+        this.clusterId = clusterId;
+    }
+
+    /**
+     * Constructor
+     *
+     * @param clusterId  the cluster id
+     * @param name       the module name
+     * @param sourceDataId   the source data id
+     */
+    public DataProxyMetricItemSet(String clusterId, String name, String sourceDataId) {
+        super(name);
+        this.clusterId = clusterId;
+        this.sourceDataId = sourceDataId;
+    }
+
+    /**
+     * Fill source metric items by event
+     *
+     * @param event    the event object
+     * @param isSuccess  whether success read
+     * @param size       the message size
+     */
+    public void fillSrcMetricItemsByEvent(Event event, boolean isSuccess, long size) {
+        fillMetricItemsByEvent(event, true, true, isSuccess, size, 0);
+    }
+
+    /**
+     * Fill sink metric items by event
+     *
+     * @param event    the event object
+     * @param isSuccess  whether success read or send
+     * @param size       the message size
+     */
+    public void fillSinkReadMetricItemsByEvent(Event event, boolean isSuccess, long size) {
+        fillMetricItemsByEvent(event, false, true, isSuccess, size, 0);
+    }
+
+    /**
+     * Fill sink send metric items by event
+     *
+     * @param event    the event object
+     * @param sentTime   the sent time
+     * @param isSuccess  whether success read or send
+     * @param size       the message size
+     */
+    public void fillSinkSendMetricItemsByEvent(Event event, long sentTime,
+                                               boolean isSuccess, long size) {
+        fillMetricItemsByEvent(event, false, false, isSuccess, size, sentTime);
+    }
+
+    /**
+     * Fill metric items by event
+     *
+     * @param event    the event object
+     * @param isSource   whether source part
+     * @param isReadOp   whether read operation
+     * @param isSuccess  whether success read or send
+     * @param size       the message size
+     */
+    private void fillMetricItemsByEvent(Event event, boolean isSource,
+                                        boolean isReadOp, boolean isSuccess,
+                                        long size, long sendTime) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, clusterId);
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID,
+                event.getHeaders().get(AttributeConstants.GROUP_ID));
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID,
+                event.getHeaders().get(AttributeConstants.STREAM_ID));
+        long dataTime = NumberUtils.toLong(
+                event.getHeaders().get(AttributeConstants.DATA_TIME));
+        long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval();
+        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+        if (isSource) {
+            dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, name);
+            dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, sourceDataId);
+        } else {
+            dimensions.put(DataProxyMetricItem.KEY_SINK_ID, name);
+            dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+                    event.getHeaders().get(ConfigConstants.TOPIC_KEY));
+        }
+        DataProxyMetricItem metricItem = findMetricItem(dimensions);
+        if (isReadOp) {
+            if (isSuccess) {
+                metricItem.readSuccessCount.incrementAndGet();
+                metricItem.readSuccessSize.addAndGet(size);
+            } else {
+                metricItem.readFailCount.incrementAndGet();
+                metricItem.readFailSize.addAndGet(size);
+            }
+        } else {
+            if (isSuccess) {
+                metricItem.sendSuccessCount.incrementAndGet();
+                metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+                if (sendTime > 0) {
+                    long currentTime = System.currentTimeMillis();
+                    long msgDataTimeL = Long.parseLong(
+                            event.getHeaders().get(AttributeConstants.DATA_TIME));
+                    long msgRcvTimeL = Long.parseLong(
+                            event.getHeaders().get(AttributeConstants.RCV_TIME));
+                    metricItem.sinkDuration.addAndGet(currentTime - sendTime);
+                    metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL);
+                    metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
+                }
+            } else {
+                metricItem.sendFailCount.incrementAndGet();
+                metricItem.sendFailSize.addAndGet(event.getBody().length);
+            }
+        }
+    }
+
     /**
      * createItem
      * 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 69da396bc..3785ae993 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.dataproxy.sink;
 
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
+
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -24,6 +27,16 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.RateLimiter;
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import io.netty.handler.codec.TooLongFrameException;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -45,8 +58,8 @@ import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
 import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
 import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
@@ -62,19 +75,6 @@ import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedExcepti
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
-
 /**
  * Use pulsarSink need adding such config, if these ara not config in dataproxy-pulsar.conf,
  * PulsarSink will use default value.
@@ -172,7 +172,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
     /*
      *  metric
      */
-    private Map<String, String> dimensions;
     private DataProxyMetricItemSet metricItemSet;
     private ConfigManager configManager;
     private Map<String, String> topicProperties;
@@ -196,7 +195,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
         topicProperties = configManager.getTopicProperties();
         pulsarCluster = configManager.getMqClusterUrl2Token();
         pulsarConfig = configManager.getMqClusterConfig(); //pulsar common config
-        Map<String, String> commonProperties = configManager.getCommonProperties();
         sinkThreadPoolSize = pulsarConfig.getThreadNum();
         if (sinkThreadPoolSize <= 0) {
             sinkThreadPoolSize = 1;
@@ -294,11 +292,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
     @Override
     public void start() {
         logger.info("[{}] pulsar sink starting...", getName());
-        // register metrics
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-
         sinkCounter.start();
         pulsarClientService.initCreateConnection(this);
 
@@ -326,8 +319,13 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
             sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i);
             sinkThreadPool[i].start();
         }
-
-        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+        // register metricItemSet
+        ConfigManager configManager = ConfigManager.getInstance();
+        String clusterId =
+                configManager.getCommonProperties().getOrDefault(
+                        ConfigConstants.PROXY_CLUSTER_NAME,
+                        ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+        this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
         MetricRegister.register(metricItemSet);
         this.canTake = true;
         logger.info("[{}] Pulsar sink started", getName());
@@ -405,19 +403,13 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                             + "last long time it will cause memoryChannel full and fileChannel write.)", getName());
                     tx.rollback();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                            event.getHeaders().get(ConfigConstants.TOPIC_KEY));
-                    DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-                    metricItem.readFailCount.incrementAndGet();
-                    metricItem.readFailSize.addAndGet(event.getBody().length);
+                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
+                            event, false, event.getBody().length);
                 } else {
                     tx.commit();
                     // metric
-                    dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                            event.getHeaders().get(ConfigConstants.TOPIC_KEY));
-                    DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-                    metricItem.readSuccessCount.incrementAndGet();
-                    metricItem.readSuccessSize.addAndGet(event.getBody().length);
+                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
+                            event, true, event.getBody().length);
                 }
             } else {
                 status = Status.BACKOFF;
@@ -437,37 +429,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
         return status;
     }
 
-    private void editStatistic(final Event event, boolean isSuccess, boolean isOrder) {
-        if (event == null
-                || pulsarConfig.getStatIntervalSec() <= 0) {
-            return;
-        }
-        // get statistic items
-        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
-        String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-        String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
-        int intMsgCnt = Integer.parseInt(
-                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
-        long dataTimeL = Long.parseLong(
-                event.getHeaders().get(AttributeConstants.DATA_TIME));
-        String orderType = isOrder ? "order" : "non-order";
-        StringBuilder newBase = new StringBuilder(512)
-                .append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
-                .append(streamId).append(SEPARATOR).append(nodeIp)
-                .append(SEPARATOR).append(NetworkUtils.getLocalIp())
-                .append(SEPARATOR).append(orderType).append(SEPARATOR)
-                .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
-        long messageSize = event.getBody().length;
-        if (isSuccess) {
-            monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, messageSize, 0);
-        } else {
-            monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt);
-            if (logPrinterB.shouldPrint()) {
-                logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
-            }
-        }
-    }
-
     @Override
     public void handleCreateClientSuccess(String url) {
         logger.info("createConnection success for url = {}", url);
@@ -481,7 +442,8 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
     }
 
     @Override
-    public void handleMessageSendSuccess(String topic, Object result, EventStat eventStat) {
+    public void handleMessageSendSuccess(String topic, Object result,
+                                         EventStat eventStat, long startTime) {
         /*
          * Statistics pulsar performance
          */
@@ -505,20 +467,11 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                     getName(), (nowCnt - oldCnt), (t2 - t1));
             t1 = t2;
         }
-        Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
-        DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-        metricItem.sendSuccessCount.incrementAndGet();
-        metricItem.sendSuccessSize.addAndGet(eventStat.getEvent().getBody().length);
-        metricItem.sendCount.incrementAndGet();
-        metricItem.sendSize.addAndGet(eventStat.getEvent().getBody().length);
-        monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
-        editStatistic(eventStat.getEvent(), true, eventStat.isOrderMessage());
-
+        addStatistics(eventStat, true, startTime);
     }
 
     @Override
     public void handleMessageSendException(String topic, EventStat eventStat, Object e) {
-        monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
         boolean needRetry = true;
         if (e instanceof NotFoundException) {
             logger.error("NotFoundException for topic " + topic + ", message will be discard!", e);
@@ -533,26 +486,67 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
             if (logPrinterB.shouldPrint()) {
                 logger.error("send failed for " + getName(), e);
             }
-            if (eventStat.getRetryCnt() == 0) {
-                editStatistic(eventStat.getEvent(), false, eventStat.isOrderMessage());
-            }
         }
-        Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
-        DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-        metricItem.sendFailCount.incrementAndGet();
-        metricItem.sendFailSize.addAndGet(eventStat.getEvent().getBody().length);
+        addStatistics(eventStat, false, 0);
         eventStat.incRetryCnt();
         if (!eventStat.isOrderMessage() && needRetry) {
             processResendEvent(eventStat);
         }
     }
 
-    private Map<String, String> getNewDimension(String otherKey, String value) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-        dimensions.put(otherKey, value);
-        return dimensions;
+    /**
+     * Add statistics information
+     *
+     * @param eventStat   the statistic event
+     * @param isSuccess  is processed successfully
+     * @param sendTime   the send time when success processed
+     */
+    private void addStatistics(EventStat eventStat, boolean isSuccess, long sendTime) {
+        if (eventStat == null || eventStat.getEvent() == null) {
+            return;
+        }
+        Event event = eventStat.getEvent();
+        // add jmx metric items;
+        PulsarSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
+                event, sendTime, isSuccess, event.getBody().length);
+        if (isSuccess) {
+            AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+        }
+        if (pulsarConfig.getStatIntervalSec() <= 0) {
+            return;
+        }
+        // add monitor items base file storage
+        String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+        String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+        String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+        int intMsgCnt = Integer.parseInt(
+                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+        long dataTimeL = Long.parseLong(
+                event.getHeaders().get(AttributeConstants.DATA_TIME));
+        String orderType = eventStat.isOrderMessage() ? "order" : "non-order";
+        // build statistic key
+        StringBuilder newBase = new StringBuilder(512)
+                .append(getName()).append(SEP_HASHTAG).append(topic)
+                .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
+                .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
+                .append(SEP_HASHTAG).append(orderType).append(SEP_HASHTAG)
+                .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+        // count data
+        if (isSuccess) {
+            monitorIndex.addAndGet(newBase.toString(),
+                    intMsgCnt, 1, event.getBody().length, 0);
+            monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
+        } else {
+            monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
+            if (eventStat.getRetryCnt() == 0) {
+                monitorIndex.addAndGet(newBase.toString(),
+                        0, 0, 0, intMsgCnt);
+                if (logPrinterB.shouldPrint()) {
+                    logger.warn("error cannot send event, {} event size is {}",
+                            topic, event.getBody().length);
+                }
+            }
+        }
     }
 
     private boolean processEvent(EventStat eventStat) {
@@ -626,7 +620,9 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                 }
             }
         } catch (Throwable throwable) {
-            monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED");
+            if (pulsarConfig.getStatIntervalSec() > 0) {
+                monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED");
+            }
             if (logPrinterC.shouldPrint()) {
                 logger.error(getName() + " Discard msg because put events to both of "
                         + "queue and fileChannel fail", throwable);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index f1f2b6bbb..86787136a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -21,7 +21,6 @@ import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
 
 import com.google.common.base.Preconditions;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -53,7 +52,6 @@ import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
@@ -94,7 +92,6 @@ public class TubeSink extends AbstractSink implements Configurable {
     // used for RoundRobin different cluster while send message
     private RateLimiter diskRateLimiter;
     private Thread[] sinkThreadPool;
-    private Map<String, String> dimensions;
     private DataProxyMetricItemSet metricItemSet;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private static final LogCounter LOG_SINK_TASK_PRINTER =
@@ -186,12 +183,13 @@ public class TubeSink extends AbstractSink implements Configurable {
             monitorIndexExt = new MonitorIndexExt("Tube_Sink_monitors#" + this.getName(),
                     statIntervalSec, maxMonitorCnt);
         }
-        // initial dimensions
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
         // register metrics
-        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+        ConfigManager configManager = ConfigManager.getInstance();
+        String clusterId =
+                configManager.getCommonProperties().getOrDefault(
+                        ConfigConstants.PROXY_CLUSTER_NAME,
+                        ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+        this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
         MetricRegister.register(metricItemSet);
         // create tube connection
         try {
@@ -272,27 +270,17 @@ public class TubeSink extends AbstractSink implements Configurable {
                 if (diskRateLimiter != null) {
                     diskRateLimiter.acquire(event.getBody().length);
                 }
-                Map<String, String> dimensions;
-                dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                        event.getHeaders().getOrDefault(ConfigConstants.TOPIC_KEY, ""));
                 if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
                     tx.commit();
                     cachedMsgCnt.incrementAndGet();
-                    DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-                    metricItem.readSuccessCount.incrementAndGet();
-                    metricItem.readSuccessSize.addAndGet(event.getBody().length);
+                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
+                            event, true, event.getBody().length);
                 } else {
                     tx.rollback();
-                    //logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
-                    //        + "--> TubeMQ, check if TubeMQ server or network is ok.(if this situation last long time "
-                    //        + "it will cause memoryChannel full and fileChannel write.)", getName());
-                    // metric
-                    DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-                    metricItem.readFailCount.incrementAndGet();
-                    metricItem.readFailSize.addAndGet(event.getBody().length);
+                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
+                            event, false, event.getBody().length);
                 }
             } else {
-                // logger.info("[{}]No data to process in the channel.",getName());
                 status = Status.BACKOFF;
                 tx.commit();
             }
@@ -436,17 +424,9 @@ public class TubeSink extends AbstractSink implements Configurable {
                 successMsgCnt.incrementAndGet();
                 inflightMsgCnt.decrementAndGet();
                 takenMsgCnt.decrementAndGet();
-                this.addMetric(myEventStat.getEvent(), true, sendTime);
-                if (statIntervalSec > 0) {
-                    monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS);
-                }
-                this.editStatistic(myEventStat.getEvent(), true);
+                this.addStatistics(myEventStat.getEvent(), true, false, sendTime);
             } else {
-                this.addMetric(myEventStat.getEvent(), false, 0);
-                if (statIntervalSec > 0) {
-                    monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE);
-                }
-                this.editStatistic(myEventStat.getEvent(), false);
+                this.addStatistics(myEventStat.getEvent(), false, false, 0);
                 if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
                     logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
                             result.getErrMsg(), resendQueue.size(),
@@ -464,50 +444,34 @@ public class TubeSink extends AbstractSink implements Configurable {
 
         @Override
         public void onException(final Throwable e) {
-            if (statIntervalSec > 0) {
-                monitorIndexExt.incrementAndGet(KEY_SINK_EXP);
-            }
-            this.editStatistic(myEventStat.getEvent(), false);
+            addStatistics(myEventStat.getEvent(), false, true, 0);
             resendEvent(myEventStat, true);
         }
 
         /**
-         * addMetric
+         * Add statistics information
+         *
+         * @param event   the statistic event
+         * @param isSuccess  is processed successfully
+         * @param isException is exception when failure processed
+         * @param sendTime   the send time when success processed
          */
-        private void addMetric(Event event, boolean result, long sendTime) {
-            Map<String, String> dimensions = new HashMap<>();
-            dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, TubeSink.this.getName());
-            dimensions.put(DataProxyMetricItem.KEY_SINK_ID, TubeSink.this.getName());
-            dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
-                    event.getHeaders().get(ConfigConstants.TOPIC_KEY));
-            DataProxyMetricItem.fillInlongId(event, dimensions);
-            DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
-            DataProxyMetricItem metricItem = TubeSink.this.metricItemSet.findMetricItem(dimensions);
-            if (result) {
-                metricItem.sendSuccessCount.incrementAndGet();
-                metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+        private void addStatistics(Event event, boolean isSuccess,
+                                   boolean isException, long sendTime) {
+            if (event == null) {
+                return;
+            }
+            // add jmx metric items;
+            TubeSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
+                    event, sendTime, isSuccess, event.getBody().length);
+            // add audit items;
+            if (isSuccess) {
                 AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-                if (sendTime > 0) {
-                    long currentTime = System.currentTimeMillis();
-                    long msgDataTimeL = Long.parseLong(
-                            event.getHeaders().get(AttributeConstants.DATA_TIME));
-                    long msgRcvTimeL = Long.parseLong(
-                            event.getHeaders().get(AttributeConstants.RCV_TIME));
-                    metricItem.sinkDuration.addAndGet(currentTime - sendTime);
-                    metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL);
-                    metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
-                }
-            } else {
-                metricItem.sendFailCount.incrementAndGet();
-                metricItem.sendFailSize.addAndGet(event.getBody().length);
             }
-        }
-
-        private void editStatistic(final Event event, boolean isSuccess) {
-            if (event == null || statIntervalSec <= 0) {
+            if (statIntervalSec <= 0) {
                 return;
             }
-            // get statistic items
+            // add monitor items base file storage
             String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
             String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
             String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
@@ -526,9 +490,14 @@ public class TubeSink extends AbstractSink implements Configurable {
             if (isSuccess) {
                 monitorIndex.addAndGet(newBase.toString(),
                         intMsgCnt, 1, event.getBody().length, 0);
+                monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS);
             } else {
                 monitorIndex.addAndGet(newBase.toString(),
                         0, 0, 0, intMsgCnt);
+                monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE);
+                if (isException) {
+                    monitorIndexExt.incrementAndGet(KEY_SINK_EXP);
+                }
             }
         }
     }
@@ -590,14 +559,6 @@ public class TubeSink extends AbstractSink implements Configurable {
         }
     }
 
-    private Map<String, String> getNewDimension(String otherKey, String value) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-        dimensions.put(otherKey, value);
-        return dimensions;
-    }
-
     /**
      * Differentiate unpublished topic sets and publish them
      * attention: only append added topics
@@ -695,14 +656,4 @@ public class TubeSink extends AbstractSink implements Configurable {
         }
         return tmpMasterAddr;
     }
-
-    /**
-     * get metricItemSet
-     *
-     * @return the metricItemSet
-     */
-    private DataProxyMetricItemSet getMetricItemSet() {
-        return metricItemSet;
-    }
-
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 7ef6228f9..6510bce57 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -34,7 +34,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.sink.EventStat;
 import org.apache.inlong.dataproxy.source.MsgType;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
@@ -175,6 +174,7 @@ public class PulsarClientService {
         // build and send message
         Map<String, String> proMap =
                 MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion);
+        long startTime = System.currentTimeMillis();
         if (es.isOrderMessage()) {
             String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
             try {
@@ -183,8 +183,7 @@ public class PulsarClientService {
                         .key(partitionKey)
                         .value(event.getBody())
                         .send();
-                sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
-                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+                sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es, startTime);
                 forCallBackP.setCanUseSend(true);
                 result = true;
             } catch (PulsarClientException ex) {
@@ -204,9 +203,8 @@ public class PulsarClientService {
                     .value(event.getBody())
                     .sendAsync()
                     .thenAccept((msgId) -> {
-                        AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
                         forCallBackP.setCanUseSend(true);
-                        sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
+                        sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es, startTime);
                     })
                     .exceptionally((e) -> {
                         forCallBackP.setCanUseSend(false);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
index ce029759c..4d5b3a393 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
@@ -21,7 +21,7 @@ import org.apache.inlong.dataproxy.sink.EventStat;
 
 public interface SendMessageCallBack {
 
-    void handleMessageSendSuccess(String topic, Object msgId, EventStat es);
+    void handleMessageSendSuccess(String topic, Object msgId, EventStat es, long startTime);
 
     void handleMessageSendException(String topic, EventStat es, Object exception);
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 12d68bf2a..8fed4bbc3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -34,10 +34,13 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
+import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,6 +86,8 @@ public abstract class BaseSource
 
   protected boolean customProcessor = false;
 
+  private DataProxyMetricItemSet metricItemSet;
+
   /*
    * monitor
    */
@@ -157,6 +162,15 @@ public abstract class BaseSource
       FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
     }
     super.start();
+    // initial metric item set
+    ConfigManager configManager = ConfigManager.getInstance();
+    String clusterId =
+            configManager.getCommonProperties().getOrDefault(
+                    ConfigConstants.PROXY_CLUSTER_NAME,
+                    ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+    this.metricItemSet =
+            new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
+    MetricRegister.register(metricItemSet);
     /*
      * init monitor logic
      */
@@ -298,7 +312,7 @@ public abstract class BaseSource
       ServiceDecoder serviceDecoder = (ServiceDecoder)Class.forName(serviceDecoderName).newInstance();
       Class<? extends ChannelInitializer> clazz =
               (Class<? extends ChannelInitializer>) Class.forName(msgFactoryName);
-      Constructor ctor = clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
+      Constructor ctor = clazz.getConstructor(BaseSource.class, ChannelGroup.class,
               String.class, ServiceDecoder.class, String.class, Integer.class,
               String.class, String.class, Boolean.class,
               Integer.class, Boolean.class, MonitorIndex.class,
@@ -319,6 +333,14 @@ public abstract class BaseSource
     return fac;
   }
 
+  /**
+   * get metricItemSet
+   * @return the metricItemSet
+   */
+  public DataProxyMetricItemSet getMetricItemSet() {
+    return metricItemSet;
+  }
+
   public Context getContext() {
     return context;
   }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 6e92cb6ba..e4a436448 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -42,7 +42,6 @@ import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.InLongMsg;
@@ -53,7 +52,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
@@ -82,7 +80,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             .on(AttributeConstants.SEPARATOR)
             .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private AbstractSource source;
+    private BaseSource source;
 
     private final ChannelGroup allChannels;
 
@@ -124,7 +122,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
      * @param monitorIndexExt MonitorIndexExt
      * @param protocolType protocolType
      */
-    public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder,
+    public ServerMessageHandler(BaseSource source, ServiceDecoder serviceDecoder,
             ChannelGroup allChannels,
             String topic, String attr, Boolean filterEmptyMsg,
             Integer maxCons, Boolean isCompressed, MonitorIndex monitorIndex,
@@ -137,16 +135,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
         if (null != attr) {
             this.defaultMXAttr = attr;
         }
-
         this.filterEmptyMsg = filterEmptyMsg;
         this.isCompressed = isCompressed;
         this.maxConnections = maxCons;
         this.protocolType = protocolType;
-        if (source instanceof SimpleTcpSource) {
-            this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet();
-        } else {
-            this.metricItemSet = new DataProxyMetricItemSet(this.toString());
-        }
+        this.metricItemSet = source.getMetricItemSet();
         this.monitorIndex = monitorIndex;
         this.monitorIndexExt = monitorIndexExt;
     }
@@ -461,7 +454,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 try {
                     processor.processEvent(event);
                     monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
-                    this.addMetric(true, data.length, event);
+                    this.addStatistics(true, data.length, event);
                     monitorIndex.addAndGet(strBuff.toString(),
                             streamMsgCnt, 1, data.length, 0);
                     strBuff.delete(0, strBuff.length());
@@ -469,7 +462,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                     logger.error("Error writting to channel,data will discard.", ex);
                     monitorIndexExt.incrementAndGet("EVENT_DROPPED");
                     monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt);
-                    this.addMetric(false, data.length, event);
+                    this.addStatistics(false, data.length, event);
                     strBuff.delete(0, strBuff.length());
                     throw new ChannelException("ProcessEvent error can't write event to channel.");
                 }
@@ -548,7 +541,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg == null) {
             logger.error("Get null msg, just skip!");
-            this.addMetric(false, 0, null);
             return;
         }
         ByteBuf cb = (ByteBuf) msg;
@@ -558,7 +550,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             int len = cb.readableBytes();
             if (len == 0 && this.filterEmptyMsg) {
                 logger.warn("Get empty msg from {}, just skip!", strRemoteIP);
-                this.addMetric(false, 0, null);
                 return;
             }
             // parse message
@@ -569,12 +560,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         strRemoteIP, msgRcvTime, remoteChannel);
                 if (resultMap == null || resultMap.isEmpty()) {
                     logger.info("Parse message result is null, from {}", strRemoteIP);
-                    this.addMetric(false, 0, null);
                     return;
                 }
             } catch (MessageIDException ex) {
                 logger.error("MessageIDException ex = {}", ex);
-                this.addMetric(false, 0, null);
                 throw new IOException(ex.getCause());
             }
             // process message by msgType
@@ -583,12 +572,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                 ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
                 heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
                 remoteChannel.writeAndFlush(heartbeatBuffer);
-                this.addMetric(false, 0, null);
                 return;
             }
             // process heart beat 8
             if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-                this.addMetric(false, 0, null);
                 return;
             }
             // process data message
@@ -616,10 +603,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         }
                         try {
                             processor.processEvent(event);
-                            this.addMetric(true, body.length, event);
+                            this.addStatistics(true, body.length, event);
                         } catch (Throwable ex) {
                             logger.error("Error writing to controller,data will discard.", ex);
-                            this.addMetric(false, body.length, event);
+                            this.addStatistics(false, body.length, event);
                             throw new ChannelException(
                                     "Process Controller Event error can't write event to channel.");
                         }
@@ -640,10 +627,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         }
                         try {
                             processor.processEvent(event);
-                            this.addMetric(true, body.length, event);
+                            this.addStatistics(true, body.length, event);
                         } catch (Throwable ex) {
                             logger.error("Error writing to controller,data will discard.", ex);
-                            this.addMetric(false, body.length, event);
+                            this.addStatistics(false, body.length, event);
                             throw new ChannelException(
                                     "Process Controller Event error can't write event to channel.");
                         }
@@ -677,31 +664,19 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     * addMetric
+     * add statistics information
      *
-     * @param result
-     * @param size
-     * @param event
+     * @param isSuccess  success or failure
+     * @param size    message size
+     * @param event   message event
      */
-    private void addMetric(boolean result, long size, Event event) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
-        dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName());
-        dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, source.getName());
-        DataProxyMetricItem.fillInlongId(event, dimensions);
-        DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
-        DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
-        if (result) {
-            metricItem.readSuccessCount.incrementAndGet();
-            metricItem.readSuccessSize.addAndGet(size);
-            try {
-                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
-            } catch (Exception e) {
-                logger.error("add metric has exception e= {}", e);
-            }
-        } else {
-            metricItem.readFailCount.incrementAndGet();
-            metricItem.readFailSize.addAndGet(size);
+    private void addStatistics(boolean isSuccess, long size, Event event) {
+        if (event == null) {
+            return;
+        }
+        this.metricItemSet.fillSrcMetricItemsByEvent(event, isSuccess, size);
+        if (isSuccess) {
+            AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
         }
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 1e2d54405..b101b9651 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -48,7 +48,6 @@ import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -87,7 +86,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
             = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
     private static final ZoneId defZoneId = ZoneId.systemDefault();
 
-    private AbstractSource source;
+    private BaseSource source;
     private final ChannelGroup allChannels;
     private int maxConnections = Integer.MAX_VALUE;
     private boolean filterEmptyMsg = false;
@@ -112,7 +111,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
      * @param isCompressed
      * @param protocolType
      */
-    public SimpleMessageHandler(AbstractSource source, ServiceDecoder serProcessor,
+    public SimpleMessageHandler(BaseSource source, ServiceDecoder serProcessor,
             ChannelGroup allChannels,
             String topic, String attr, Boolean filterEmptyMsg, Integer maxMsgLength,
             Integer maxCons,
@@ -130,11 +129,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
         this.isCompressed = isCompressed;
         this.maxConnections = maxCons;
         this.protocolType = protocolType;
-        if (source instanceof SimpleTcpSource) {
-            this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet();
-        } else {
-            this.metricItemSet = new DataProxyMetricItemSet(this.toString());
-        }
+        this.metricItemSet = source.getMetricItemSet();
     }
 
     private String getRemoteIp(Channel channel) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 1f1a86e66..734fd95dd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -39,9 +39,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
-import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.utils.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,8 +81,6 @@ public class SimpleTcpSource extends BaseSource
 
     private ServerBootstrap bootstrap;
 
-    private DataProxyMetricItemSet metricItemSet;
-
     public SimpleTcpSource() {
         super();
 
@@ -182,8 +178,6 @@ public class SimpleTcpSource extends BaseSource
     @Override
     public synchronized void startSource() {
         logger.info("start " + this.getName());
-        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
-        MetricRegister.register(metricItemSet);
         checkBlackListThread = new CheckBlackListThread();
         checkBlackListThread.start();
 //        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
@@ -270,14 +264,6 @@ public class SimpleTcpSource extends BaseSource
         }
     }
 
-    /**
-     * get metricItemSet
-     * @return the metricItemSet
-     */
-    public DataProxyMetricItemSet getMetricItemSet() {
-        return metricItemSet;
-    }
-
     @Override
     public String getProtocolName() {
         return "tcp";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index 5c17451d3..be42b4494 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -50,6 +50,7 @@ public class SimpleUdpSource
     @Override
     public void startSource() {
         // setup Netty server
+        logger.info("start " + this.getName());
         bootstrap = new Bootstrap();
         logger.info("Set max workers : {} ;",maxThreads);
         bootstrap.channel(NioDatagramChannel.class);