You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/19 11:30:15 UTC

[inlong] 02/02: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit eb00535bf0ac9fff42c58aebeb22b05b9b899829
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Mon Sep 19 19:25:03 2022 +0800

    [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../inlong/sort/configuration/Constants.java       |  22 +++-
 .../apache/inlong/sort/protocol/InlongMetric.java  |  22 +---
 .../org/apache/inlong/sort/base/Constants.java     |  11 +-
 .../apache/inlong/sort/base/metric/MetricData.java |  35 ++++-
 .../inlong/sort/base/metric/MetricOption.java      | 141 ++++++++++++++++-----
 .../inlong/sort/base/metric/SinkMetricData.java    | 128 +++++++++++--------
 .../inlong/sort/base/metric/SourceMetricData.java  | 105 +++++++--------
 .../sort/base/util/ValidateMetricOptionUtils.java  |  39 ------
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  26 ++--
 .../table/RowElasticsearchSinkFunction.java        |  55 ++------
 .../sort/filesystem/FileSystemTableSink.java       |   2 -
 .../filesystem/stream/AbstractStreamingWriter.java |  25 ++--
 .../sort/hbase/HBase2DynamicTableFactory.java      |   2 -
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  |  46 +++----
 .../hive/filesystem/AbstractStreamingWriter.java   |  25 ++--
 .../iceberg/flink/FlinkDynamicTableFactory.java    |  11 +-
 .../sort/iceberg/flink/IcebergTableSink.java       |   2 +-
 .../inlong/sort/iceberg/flink/sink/FlinkSink.java  |   2 +-
 .../iceberg/flink/sink/IcebergStreamWriter.java    |   7 -
 .../inlong/sort/iceberg/IcebergTableSink.java      |   2 +-
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |   2 +-
 .../sort/iceberg/sink/IcebergStreamWriter.java     |  25 ++--
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  76 +++--------
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  |  52 +++-----
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |  48 ++-----
 .../table/DynamicKafkaDeserializationSchema.java   |   8 +-
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |  32 ++---
 .../mongodb/table/MongoDBTableSourceFactory.java   |   2 -
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |  34 ++---
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |  36 ++----
 .../source/metrics/MySqlSourceReaderMetrics.java   | 124 ++----------------
 .../mysql/table/MySqlTableInlongSourceFactory.java |   2 -
 .../sort/cdc/oracle/DebeziumSourceFunction.java    |  32 ++---
 .../cdc/oracle/table/OracleTableSourceFactory.java |   2 -
 .../DebeziumSourceFunction.java                    |  43 ++-----
 .../cdc/postgres/table/PostgreSQLTableFactory.java |   2 -
 .../apache/inlong/sort/pulsar/table/Constants.java |  32 -----
 .../table/DynamicPulsarDeserializationSchema.java  |  41 ++----
 .../pulsar/table/PulsarDynamicTableFactory.java    |   2 +-
 .../table/UpsertPulsarDynamicTableFactory.java     |   2 +-
 .../sqlserver/table/DebeziumSourceFunction.java    |  56 ++------
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |  25 ++--
 42 files changed, 515 insertions(+), 871 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index b0979c62f..e4d6a3906 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -60,6 +60,12 @@ public class Constants {
 
     public static final String HIVE_SINK_ORC_PREFIX = HIVE_SINK_PREFIX + "orc.";
 
+    public static final String GROUP_ID = "groupId";
+
+    public static final String STREAM_ID = "streamId";
+
+    public static final String NODE_ID = "nodeId";
+
     // ------------------------------------------------------------------------
     //  Operator uid
     // ------------------------------------------------------------------------
@@ -275,10 +281,18 @@ public class Constants {
             .defaultValue(5)
             .withDescription("minutes");
 
-    public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS = key("metrics.audit.proxy.hosts")
-            .noDefaultValue()
-            .withDescription("Audit proxy host address for reporting audit metrics. "
-                    + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+    public static final ConfigOption<String> METRICS_LABELS =
+            ConfigOptions.key("inlong.metric.labels")
+                    .noDefaultValue()
+                    .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+                            + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
+
+
+    public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS =
+            ConfigOptions.key("metrics.audit.proxy.hosts")
+                    .noDefaultValue()
+                    .withDescription("Audit proxy host address for reporting audit metrics. \n"
+                            + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
     // ------------------------------------------------------------------------
     //  Single tenant related
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
index 4afb46a89..b90a38ccb 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
@@ -19,29 +19,11 @@ package org.apache.inlong.sort.protocol;
 
 /**
  * The class is the abstract of Inlong Metric.
- * We agree that the key of the inlong metric report is `inlong.metric`,
+ * We agree that the key of the inlong metric report is
+ * {@link org.apache.inlong.sort.configuration.Constants#METRICS_GROUP_STREAM_NODE},
  * and its value is format by `groupId&streamId&nodeId`.
  * If node implements this interface, we will inject the key and value into the corresponding Sort-Connectors
  * during flink sql parser
  */
 public interface InlongMetric {
-
-    /**
-     * The key of metric, it must be `inlong.metric` here.
-     */
-    String METRIC_KEY = "inlong.metric";
-
-    /**
-     * The value format, it must be `groupId&streamId&nodeId` here.
-     * The groupId is the id of Inlong Group
-     * The streamId is the id of Inlong Stream
-     * The nodeId is the id of Inlong Source or Sink
-     */
-    String METRIC_VALUE_FORMAT = "%s&%s&%s";
-
-    /**
-     * The key of InLong audit, the value should be ip:port&ip:port
-     */
-    String AUDIT_KEY = "inlong.audit";
-
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 18ff408f2..679bcc6c3 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -86,17 +86,18 @@ public final class Constants {
     public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
 
     public static final ConfigOption<String> INLONG_METRIC =
-        ConfigOptions.key("inlong.metric")
+        ConfigOptions.key("inlong.metric.labels")
             .stringType()
             .noDefaultValue()
-            .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
+            .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+                    + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
 
     public static final ConfigOption<String> INLONG_AUDIT =
-        ConfigOptions.key("inlong.audit")
+        ConfigOptions.key("metrics.audit.proxy.hosts")
             .stringType()
             .noDefaultValue()
-            .withDescription("INLONG AUDIT HOST + '&' + PORT");
+            .withDescription("Audit proxy host address for reporting audit metrics. \n"
+                    + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
     public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
             ConfigOptions.key("sink.ignore.changelog")
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
index 681318ff7..381fa5ac4 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 
+import java.util.Map;
+
 import static org.apache.inlong.sort.base.Constants.GROUP_ID;
 import static org.apache.inlong.sort.base.Constants.NODE_ID;
 import static org.apache.inlong.sort.base.Constants.STREAM_ID;
@@ -40,26 +42,39 @@ public interface MetricData {
      */
     MetricGroup getMetricGroup();
 
+    /**
+     * Get labels
+     *
+     * @return The labels defined in inlong
+     */
+    Map<String, String> getLabels();
+
     /**
      * Get group id
      *
      * @return The group id defined in inlong
      */
-    String getGroupId();
+    default String getGroupId() {
+        return getLabels().get(GROUP_ID);
+    }
 
     /**
      * Get stream id
      *
      * @return The stream id defined in inlong
      */
-    String getStreamId();
+    default String getStreamId() {
+        return getLabels().get(STREAM_ID);
+    }
 
     /**
      * Get node id
      *
      * @return The node id defined in inlong
      */
-    String getNodeId();
+    default String getNodeId() {
+        return getLabels().get(NODE_ID);
+    }
 
     /**
      * Register a counter metric
@@ -69,8 +84,11 @@ public interface MetricData {
      * @return Counter of registered
      */
     default Counter registerCounter(String metricName, Counter counter) {
-        return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
-                .addGroup(NODE_ID, getNodeId()).counter(metricName, counter);
+        MetricGroup inlongMetricGroup = getMetricGroup();
+        for (Map.Entry<String, String> label : getLabels().entrySet()) {
+            inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue());
+        }
+        return inlongMetricGroup.counter(metricName, counter);
     }
 
     /**
@@ -90,8 +108,11 @@ public interface MetricData {
      * @return Meter of registered
      */
     default Meter registerMeter(String metricName, Counter counter) {
-        return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
-                .addGroup(NODE_ID, getNodeId()).meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
+        MetricGroup inlongMetricGroup = getMetricGroup();
+        for (Map.Entry<String, String> label : getLabels().entrySet()) {
+            inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue());
+        }
+        return inlongMetricGroup.meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
     }
 
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index d2179ae54..f4c679f9c 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -19,13 +19,19 @@
 package org.apache.inlong.sort.base.metric;
 
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
+import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
 
 public class MetricOption {
     private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
@@ -35,55 +41,126 @@ public class MetricOption {
             + "3}|65[0-4]\\d{"
             + "2}|655[0-2]\\d|6553[0-5])$";
 
-    private final String groupId;
-    private final String streamId;
-    private final String nodeId;
+    private Map<String, String> labels;
     private final HashSet<String> ipPortList;
-    private String ipPorts;
+    private Optional<String> ipPorts;
+    private RegisteredMetric registeredMetric;
+    private long initRecords;
+    private long initBytes;
 
-    public MetricOption(String inLongMetric) {
-        this(inLongMetric, null);
-    }
+    private MetricOption(
+            String inlongLabels,
+            @Nullable String inlongAudit,
+            RegisteredMetric registeredMetric,
+            long initRecords,
+            long initBytes) {
+        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
+                "Inlong labels must be set for register metric.");
 
-    public MetricOption(String inLongMetric, @Nullable String inLongAudit) {
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inLongMetric, inLongAudit);
-        String[] inLongMetricArray = inLongMetric.split(DELIMITER);
-        Preconditions.checkArgument(inLongMetricArray.length == 3,
-                "Error inLong metric format: " + inLongMetric);
-        this.groupId = inLongMetricArray[0];
-        this.streamId = inLongMetricArray[1];
-        this.nodeId = inLongMetricArray[2];
-        this.ipPortList = new HashSet<>();
-        this.ipPorts = null;
+        this.initRecords = initRecords;
+        this.initBytes = initBytes;
+        this.labels = new LinkedHashMap<>();
+        String[] inLongLabelArray = inlongLabels.split(DELIMITER);
+        Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")),
+                "InLong metric label format must be xxx=xxx");
+        Stream.of(inLongLabelArray).forEach(label -> {
+            String key = label.substring(0, label.indexOf('='));
+            String value = label.substring(label.indexOf('=') + 1);
+            labels.put(key, value);
+        });
 
-        if (inLongAudit != null) {
-            String[] ipPortStrs = inLongAudit.split(DELIMITER);
-            this.ipPorts = inLongAudit;
+        this.ipPortList = new HashSet<>();
+        this.ipPorts = Optional.ofNullable(inlongAudit);
+        if (ipPorts.isPresent()) {
+            Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID),
+                    "groupId and streamId must be set when enable inlong audit collect.");
+            String[] ipPortStrs = inlongAudit.split(DELIMITER);
             for (String ipPort : ipPortStrs) {
                 Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort),
-                        "Error inLong audit format: " + inLongAudit);
+                        "Error inLong audit format: " + inlongAudit);
                 this.ipPortList.add(ipPort);
             }
         }
-    }
 
-    public String getGroupId() {
-        return groupId;
-    }
-
-    public String getStreamId() {
-        return streamId;
+        if (registeredMetric != null) {
+            this.registeredMetric = registeredMetric;
+        }
     }
 
-    public String getNodeId() {
-        return nodeId;
+    public Map<String, String> getLabels() {
+        return labels;
     }
 
     public HashSet<String> getIpPortList() {
         return ipPortList;
     }
 
-    public String getIpPorts() {
+    public Optional<String> getIpPorts() {
         return ipPorts;
     }
+
+    public RegisteredMetric getRegisteredMetric() {
+        return registeredMetric;
+    }
+
+    public long getInitRecords() {
+        return initRecords;
+    }
+
+    public long getInitBytes() {
+        return initBytes;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public enum RegisteredMetric {
+        ALL,
+        NORMAL,
+        DIRTY
+    }
+
+    public static class Builder {
+        private String inlongLabels;
+        private String inlongAudit;
+        private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
+        private long initRecords = 0L;
+        private long initBytes = 0L;
+
+        private Builder() {
+        }
+
+        public MetricOption.Builder withInlongLabels(String inlongLabels) {
+            this.inlongLabels = inlongLabels;
+            return this;
+        }
+
+        public MetricOption.Builder withInlongAudit(String inlongAudit) {
+            this.inlongAudit = inlongAudit;
+            return this;
+        }
+
+        public MetricOption.Builder withRegisterMetric(RegisteredMetric registeredMetric) {
+            this.registeredMetric = registeredMetric;
+            return this;
+        }
+
+        public MetricOption.Builder withInitRecords(long initRecords) {
+            this.initRecords = initRecords;
+            return this;
+        }
+
+        public MetricOption.Builder withInitBytes(long initBytes) {
+            this.initBytes = initBytes;
+            return this;
+        }
+
+        public MetricOption build() {
+            if (inlongLabels == null && inlongAudit == null) {
+                return null;
+            }
+            return new MetricOption(inlongLabels, inlongAudit, registeredMetric, initRecords, initBytes);
+        }
+    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 4073ddd44..d065496e4 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 
-import javax.annotation.Nullable;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Map;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
@@ -45,10 +42,8 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
  */
 public class SinkMetricData implements MetricData {
 
-    private final MetricGroup metricGroup;
-    private final String groupId;
-    private final String streamId;
-    private final String nodeId;
+    private MetricGroup metricGroup;
+    private Map<String, String> labels;
     private AuditImp auditImp;
     private Counter numRecordsOut;
     private Counter numBytesOut;
@@ -59,23 +54,46 @@ public class SinkMetricData implements MetricData {
     private Meter numRecordsOutPerSecond;
     private Meter numBytesOutPerSecond;
 
-    public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, null);
-    }
-
     public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
-    }
-
-    public SinkMetricData(
-            String groupId, String streamId, String nodeId, MetricGroup metricGroup,
-            @Nullable String auditHostAndPorts) {
         this.metricGroup = metricGroup;
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+        this.labels = option.getLabels();
+
+        ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter();
+        ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter();
+        switch (option.getRegisteredMetric()) {
+            case DIRTY:
+                registerMetricsForDirtyBytes(new ThreadSafeCounter());
+                registerMetricsForDirtyRecords(new ThreadSafeCounter());
+                break;
+            case NORMAL:
+                registerMetricsForNumBytesOut(new ThreadSafeCounter());
+                registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+                registerMetricsForNumBytesOutPerSecond();
+                registerMetricsForNumRecordsOutPerSecond();
+
+                recordsOutCounter.inc(option.getInitRecords());
+                bytesOutCounter.inc(option.getInitBytes());
+                registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
+                registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+                break;
+            default:
+                registerMetricsForDirtyBytes(new ThreadSafeCounter());
+                registerMetricsForDirtyRecords(new ThreadSafeCounter());
+                registerMetricsForNumBytesOut(new ThreadSafeCounter());
+                registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+                registerMetricsForNumBytesOutPerSecond();
+                registerMetricsForNumRecordsOutPerSecond();
+
+                recordsOutCounter.inc(option.getInitRecords());
+                bytesOutCounter.inc(option.getInitBytes());
+                registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
+                registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+                break;
+
+        }
+
+        if (option.getIpPorts().isPresent()) {
+            AuditImp.getInstance().setAuditProxy(option.getIpPortList());
             this.auditImp = AuditImp.getInstance();
         }
     }
@@ -218,18 +236,8 @@ public class SinkMetricData implements MetricData {
     }
 
     @Override
-    public String getGroupId() {
-        return groupId;
-    }
-
-    @Override
-    public String getStreamId() {
-        return streamId;
-    }
-
-    @Override
-    public String getNodeId() {
-        return nodeId;
+    public Map<String, String> getLabels() {
+        return labels;
     }
 
     public Counter getNumRecordsOutForMeter() {
@@ -242,26 +250,26 @@ public class SinkMetricData implements MetricData {
 
     public void invokeWithEstimate(Object o) {
         long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
-        this.numRecordsOut.inc();
-        this.numBytesOut.inc(size);
-        this.numRecordsOutForMeter.inc();
-        this.numBytesOutForMeter.inc(size);
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    getGroupId(),
-                    getStreamId(),
-                    System.currentTimeMillis(),
-                    1,
-                    size);
-        }
+        invoke(1, size);
     }
 
     public void invoke(long rowCount, long rowSize) {
-        this.numRecordsOut.inc(rowCount);
-        this.numBytesOut.inc(rowSize);
-        this.numRecordsOutForMeter.inc(rowCount);
-        this.numBytesOutForMeter.inc(rowSize);
+        if (numRecordsOut != null) {
+            numRecordsOut.inc(rowCount);
+        }
+
+        if (numBytesOut != null) {
+            numBytesOut.inc(rowSize);
+        }
+
+        if (numRecordsOutForMeter != null) {
+            numRecordsOutForMeter.inc(rowCount);
+        }
+
+        if (numBytesOutForMeter != null) {
+            numBytesOutForMeter.inc(rowCount);
+        }
+
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_OUTPUT,
@@ -273,12 +281,22 @@ public class SinkMetricData implements MetricData {
         }
     }
 
+    public void invokeDirty(long rowCount, long rowSize) {
+        if (dirtyRecords != null) {
+            dirtyRecords.inc(rowCount);
+        }
+
+        if (dirtyBytes != null) {
+            dirtyBytes.inc(rowSize);
+        }
+    }
+
     @Override
     public String toString() {
         return "SinkMetricData{"
-                + "groupId='" + groupId + '\''
-                + ", streamId='" + streamId + '\''
-                + ", nodeId='" + nodeId + '\''
+                + "metricGroup=" + metricGroup
+                + ", labels=" + labels
+                + ", auditImp=" + auditImp
                 + ", numRecordsOut=" + numRecordsOut.getCount()
                 + ", numBytesOut=" + numBytesOut.getCount()
                 + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 5c25fcc75..3cffcfe54 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 
-import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
-import java.util.Arrays;
-import java.util.HashSet;
-
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
@@ -43,46 +40,39 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
  */
 public class SourceMetricData implements MetricData {
 
-    private final MetricGroup metricGroup;
-    private final String groupId;
-    private final String streamId;
-    private final String nodeId;
+    private MetricGroup metricGroup;
+    private Map<String, String> labels;
     private Counter numRecordsIn;
     private Counter numBytesIn;
     private Counter numRecordsInForMeter;
     private Counter numBytesInForMeter;
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
-    private final AuditImp auditImp;
-
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, (AuditImp) null);
-    }
+    private AuditImp auditImp;
 
     public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
-    }
-
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
-            AuditImp auditImp) {
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
         this.metricGroup = metricGroup;
-        this.auditImp = auditImp;
-    }
+        this.labels = option.getLabels();
+
+        SimpleCounter recordsInCounter = new SimpleCounter();
+        SimpleCounter bytesInCounter = new SimpleCounter();
+        switch (option.getRegisteredMetric()) {
+            default:
+                registerMetricsForNumRecordsIn();
+                registerMetricsForNumBytesIn();
+                registerMetricsForNumBytesInPerSecond();
+                registerMetricsForNumRecordsInPerSecond();
+
+                recordsInCounter.inc(option.getInitRecords());
+                bytesInCounter.inc(option.getInitBytes());
+                registerMetricsForNumBytesInForMeter(recordsInCounter);
+                registerMetricsForNumRecordsInForMeter(bytesInCounter);
+                break;
+        }
 
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
-            @Nullable String auditHostAndPorts) {
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
-        this.metricGroup = metricGroup;
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+        if (option.getIpPorts() != null) {
+            AuditImp.getInstance().setAuditProxy(option.getIpPortList());
             this.auditImp = AuditImp.getInstance();
-        } else {
-            this.auditImp = null;
         }
     }
 
@@ -196,26 +186,32 @@ public class SourceMetricData implements MetricData {
     }
 
     @Override
-    public String getGroupId() {
-        return groupId;
-    }
-
-    @Override
-    public String getStreamId() {
-        return streamId;
+    public Map<String, String> getLabels() {
+        return labels;
     }
 
-    @Override
-    public String getNodeId() {
-        return nodeId;
+    public void outputMetricsWithEstimate(Object o) {
+        long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
+        outputMetrics(1, size);
     }
 
     public void outputMetrics(long rowCountSize, long rowDataSize) {
-        outputMetricForFlink(rowCountSize, rowDataSize);
-        outputMetricForAudit(rowCountSize, rowDataSize);
-    }
+        if (numRecordsIn != null) {
+            this.numRecordsIn.inc(rowCountSize);
+        }
+
+        if (numBytesIn != null) {
+            this.numBytesIn.inc(rowDataSize);
+        }
+
+        if (numRecordsInForMeter != null) {
+            this.numRecordsInForMeter.inc(rowCountSize);
+        }
+
+        if (numBytesInForMeter != null) {
+            this.numBytesInForMeter.inc(rowDataSize);
+        }
 
-    public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_INPUT,
@@ -227,25 +223,18 @@ public class SourceMetricData implements MetricData {
         }
     }
 
-    public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
-        this.numBytesIn.inc(rowDataSize);
-        this.numRecordsIn.inc(rowCountSize);
-        this.numBytesInForMeter.inc(rowDataSize);
-        this.numRecordsInForMeter.inc(rowCountSize);
-    }
-
     @Override
     public String toString() {
         return "SourceMetricData{"
-                + "groupId='" + groupId + '\''
-                + ", streamId='" + streamId + '\''
-                + ", nodeId='" + nodeId + '\''
+                + "metricGroup=" + metricGroup
+                + ", labels=" + labels
                 + ", numRecordsIn=" + numRecordsIn.getCount()
                 + ", numBytesIn=" + numBytesIn.getCount()
                 + ", numRecordsInForMeter=" + numRecordsInForMeter.getCount()
                 + ", numBytesInForMeter=" + numBytesInForMeter.getCount()
                 + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
                 + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+                + ", auditImp=" + auditImp
                 + '}';
     }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
deleted file mode 100644
index bd58e31ae..000000000
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.inlong.sort.base.util;
-
-import org.apache.flink.table.api.ValidationException;
-
-/**
- * validate option tool
- */
-public class ValidateMetricOptionUtils {
-
-    /**
-     * validate inlong metric when set inlong audit
-     * @param inlongMetric inlong.metric option value
-     * @param inlongAudit inlong.audit option value
-     */
-    public static void validateInlongMetricIfSetInlongAudit(String inlongMetric, String inlongAudit) {
-        if (inlongAudit != null && inlongMetric == null) {
-            throw new ValidationException("inlong metric is necessary when set inlong audit");
-        }
-    }
-
-}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 000e1c23a..cdea07fa5 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.DocWriteRequest;
@@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * Base class for all Flink Elasticsearch Sinks.
@@ -266,15 +267,14 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
     @Override
     public void open(Configuration parameters) throws Exception {
         client = callBridge.createClient(userConfig);
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
-            sinkMetricData.registerMetricsForDirtyBytes();
-            sinkMetricData.registerMetricsForDirtyRecords();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withRegisterMetric(RegisteredMetric.DIRTY)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
+
         callBridge.verifyClientConnection(client);
         bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
         requestIndexer =
@@ -482,8 +482,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
                         if (failure != null) {
                             restStatus = itemResponse.getFailure().getStatus();
                             actionRequest = request.requests().get(i);
-                            if (sinkMetricData.getDirtyRecords() != null) {
-                                sinkMetricData.getDirtyRecords().inc();
+                            if (sinkMetricData != null) {
+                                sinkMetricData.invokeDirty(1, 0);
                             }
                             if (restStatus == null) {
                                 if (actionRequest instanceof ActionRequest) {
@@ -526,8 +526,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
         public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
             try {
                 for (DocWriteRequest writeRequest : request.requests()) {
-                    if (sinkMetricData.getDirtyRecords() != null) {
-                        sinkMetricData.getDirtyRecords().inc();
+                    if (sinkMetricData != null) {
+                        sinkMetricData.invokeDirty(1, 0);
                     }
                     if (writeRequest instanceof ActionRequest) {
                         failureHandler.onFailure(
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 0ae93231d..e4f1419c5 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,8 +20,8 @@ package org.apache.inlong.sort.elasticsearch.table;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.table.api.TableException;
@@ -37,13 +37,9 @@ import org.elasticsearch.common.xcontent.XContentType;
 
 import javax.annotation.Nullable;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Objects;
 import java.util.function.Function;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
 public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
 
@@ -63,11 +59,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     private transient  RuntimeContext runtimeContext;
 
     private SinkMetricData sinkMetricData;
-    private Long dataSize = 0L;
-    private Long rowSize = 0L;
-    private String groupId;
-    private String streamId;
-    private transient AuditImp auditImp;
 
     public RowElasticsearchSinkFunction(
             IndexGenerator indexGenerator,
@@ -94,44 +85,20 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            groupId = inlongMetricArray[0];
-            streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
-    }
-
-    private void outputMetricForAudit(long size) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    groupId,
-                    streamId,
-                    System.currentTimeMillis(),
-                    1,
-                    size);
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.NORMAL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
         }
     }
 
     private void sendMetrics(byte[] document) {
-        if (sinkMetricData.getNumBytesOut() != null) {
-            sinkMetricData.getNumBytesOut().inc(document.length);
-        }
-        if (sinkMetricData.getNumRecordsOut() != null) {
-            sinkMetricData.getNumRecordsOut().inc();
+        if (sinkMetricData != null) {
+            sinkMetricData.invoke(1, document.length);
         }
-        outputMetricForAudit(document.length);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
index 60bcf9332..b18e7e32d 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
@@ -77,7 +77,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 import org.apache.inlong.sort.filesystem.stream.StreamingSink;
 
 import javax.annotation.Nullable;
@@ -157,7 +156,6 @@ public class FileSystemTableSink extends AbstractFileSystemTable
         this.configuredParallelism = tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
         this.inlongMetric = tableOptions.get(INLONG_METRIC);
         this.inlongAudit = tableOptions.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 47516a270..95267c698 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -32,10 +32,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
-
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
@@ -102,19 +101,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
     @Override
     public void open() throws Exception {
         super.open();
-        if (inlongMetric != null) {
-            String[] inLongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inLongMetricArray[0];
-            String streamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
-            metricData = new SinkMetricData(
-                    groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(), inlongAudit);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .withInlongAudit(inlongAudit)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
index da14c947a..55d5d57d7 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
 
 import java.util.HashSet;
@@ -106,7 +105,6 @@ public class HBase2DynamicTableFactory
         HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
         String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = tableOptions.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         return new HBaseDynamicTableSink(
                 tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, inlongMetric, inlongAudit);
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 907b758ff..a1e9641d2 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -37,9 +37,9 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,19 +123,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
         org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
         try {
             this.runtimeContext = getRuntimeContext();
-            if (inlongMetric != null && !inlongMetric.isEmpty()) {
-                String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
-                String groupId = inlongMetricArray[0];
-                String streamId = inlongMetricArray[1];
-                String nodeId = inlongMetricArray[2];
-                sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup(),
-                        inlongAudit);
-                sinkMetricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-                sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
+            MetricOption metricOption = MetricOption.builder()
+                    .withInlongLabels(inlongMetric)
+                    .withInlongAudit(inlongAudit)
+                    .withRegisterMetric(RegisteredMetric.ALL)
+                    .build();
+            if (metricOption != null) {
+                sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
             }
             this.mutationConverter.open();
             this.numPendingRequests = new AtomicLong(0);
@@ -163,14 +157,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
                                     }
                                     try {
                                         flush();
-                                        sinkMetricData.invoke(rowSize, dataSize);
+                                        if (sinkMetricData != null) {
+                                            sinkMetricData.invoke(rowSize, dataSize);
+                                        }
                                         resetStateAfterFlush();
                                     } catch (Exception e) {
-                                        if (sinkMetricData.getDirtyRecords() != null) {
-                                            sinkMetricData.getDirtyRecords().inc(rowSize);
-                                        }
-                                        if (sinkMetricData.getDirtyBytes() != null) {
-                                            sinkMetricData.getDirtyBytes().inc(dataSize);
+                                        if (sinkMetricData != null) {
+                                            sinkMetricData.invokeDirty(rowSize, dataSize);
                                         }
                                         resetStateAfterFlush();
                                         // fail the sink and skip the rest of the items
@@ -236,14 +229,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
                 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
             try {
                 flush();
-                sinkMetricData.invoke(rowSize, dataSize);
+                if (sinkMetricData != null) {
+                    sinkMetricData.invoke(rowSize, dataSize);
+                }
                 resetStateAfterFlush();
             } catch (Exception e) {
-                if (sinkMetricData.getDirtyRecords() != null) {
-                    sinkMetricData.getDirtyRecords().inc(rowSize);
-                }
-                if (sinkMetricData.getDirtyBytes() != null) {
-                    sinkMetricData.getDirtyBytes().inc(dataSize);
+                if (sinkMetricData != null) {
+                    sinkMetricData.invokeDirty(rowSize, dataSize);
                 }
                 resetStateAfterFlush();
                 throw e;
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 5d0ca7bf0..ab2e845f2 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -32,12 +32,11 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import javax.annotation.Nullable;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /**
  * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
  * file and bucket information to downstream.
@@ -111,19 +110,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
     @Override
     public void open() throws Exception {
         super.open();
-        if (inlongMetric != null) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String inlongGroupId = inlongMetricArray[0];
-            String inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(
-                    inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
index 35d3a6c76..7840e3ea7 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
@@ -135,12 +135,11 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         Map<String, String> tableProps = catalogTable.getOptions();
         TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
         SyncRewriteDataFilesActionOption compactOption = new SyncRewriteDataFilesActionOption(tableProps);
-        MetricOption metricOption = null;
-        if (tableProps.containsKey(INLONG_METRIC.key())) {
-            metricOption = new MetricOption(
-                    tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
-                    tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()));
-        }
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()))
+                .withInlongAudit(tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
+                .build();
+
         boolean appendMode = tableProps.containsKey(ICEBERG_IGNORE_ALL_CHANGELOG.key())
                 ? Boolean.parseBoolean(tableProps.get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
                 : ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue();
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
index 112b6a36b..df0ced054 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
@@ -43,7 +43,7 @@ import java.util.Map;
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
  * Add a table property `write.compact.enable` to support small file compact.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit
  */
 public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
     private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
index 0b48fb169..c24d5a8c6 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
@@ -74,7 +74,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
  * Add a table property `write.compact.enable` to support small file compact.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit.
  */
 public class FlinkSink {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index 9f7cffbcf..bb00b7808 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -76,12 +75,6 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         // Initialize metric
         if (metricOption != null) {
             metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
         }
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 088622278..37eb2670b 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -46,7 +46,7 @@ import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_A
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit
  */
 public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 65929bc34..8662722d8 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -71,7 +71,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit.
  */
 public class FlinkSink {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 79df9049f..8318c7177 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -28,14 +28,13 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -79,19 +78,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         this.writer = taskWriterFactory.create();
 
         // Initialize metric
-        if (inlongMetric != null) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String inlongGroupId = inlongMetricArray[0];
-            String inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(
-                    inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 66af78c4d..e32d4094b 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -33,7 +33,8 @@ import org.apache.flink.connector.jdbc.utils.JdbcUtils;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,9 +44,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -53,8 +52,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * A JDBC outputFormat that supports batching records before writing records to database.
@@ -80,9 +77,6 @@ public class JdbcBatchingOutputFormat<
     private transient RuntimeContext runtimeContext;
 
     private SinkMetricData sinkMetricData;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private transient AuditImp auditImp;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
 
@@ -130,22 +124,13 @@ public class JdbcBatchingOutputFormat<
     public void open(int taskNumber, int numTasks) throws IOException {
         super.open(taskNumber, numTasks);
         this.runtimeContext = getRuntimeContext();
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForDirtyBytes();
-            sinkMetricData.registerMetricsForDirtyRecords();
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
         }
         jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
         if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -159,20 +144,13 @@ public class JdbcBatchingOutputFormat<
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData.getNumRecordsOut() != null) {
-                                                sinkMetricData.getNumRecordsOut().inc(rowSize);
-                                            }
-                                            if (sinkMetricData.getNumBytesOut() != null) {
-                                                sinkMetricData.getNumBytesOut()
-                                                        .inc(dataSize);
+                                            if (sinkMetricData != null) {
+                                                sinkMetricData.invoke(rowSize, dataSize);
                                             }
                                             resetStateAfterFlush();
                                         } catch (Exception e) {
-                                            if (sinkMetricData.getDirtyRecords() != null) {
-                                                sinkMetricData.getDirtyRecords().inc(rowSize);
-                                            }
-                                            if (sinkMetricData.getDirtyBytes() != null) {
-                                                sinkMetricData.getDirtyBytes().inc(dataSize);
+                                            if (sinkMetricData != null) {
+                                                sinkMetricData.invokeDirty(rowSize, dataSize);
                                             }
                                             resetStateAfterFlush();
                                             flushException = e;
@@ -203,46 +181,26 @@ public class JdbcBatchingOutputFormat<
         }
     }
 
-    private void outputMetricForAudit(long length) {
-        if (auditImp != null) {
-            auditImp.add(
-                    AUDIT_SORT_INPUT,
-                    inlongGroupId,
-                    inlongStreamId,
-                    System.currentTimeMillis(),
-                    1,
-                    length);
-        }
-    }
-
     @Override
     public final synchronized void writeRecord(In record) throws IOException {
         checkFlushException();
 
         rowSize++;
         dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
-        outputMetricForAudit(dataSize);
         try {
             addToBatch(record, jdbcRecordExtractor.apply(record));
             batchCount++;
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData.getNumRecordsOut() != null) {
-                    sinkMetricData.getNumRecordsOut().inc(rowSize);
-                }
-                if (sinkMetricData.getNumBytesOut() != null) {
-                    sinkMetricData.getNumBytesOut()
-                            .inc(dataSize);
+                if (sinkMetricData != null) {
+                    sinkMetricData.invoke(rowSize, dataSize);
                 }
                 resetStateAfterFlush();
             }
         } catch (Exception e) {
-            if (sinkMetricData.getDirtyRecords() != null) {
-                sinkMetricData.getDirtyRecords().inc(rowSize);
-            }
-            if (sinkMetricData.getDirtyBytes() != null) {
-                sinkMetricData.getDirtyBytes().inc(dataSize);
+            if (sinkMetricData != null) {
+                sinkMetricData.invokeDirty(rowSize, dataSize);
             }
             resetStateAfterFlush();
             throw new IOException("Writing records to JDBC failed.", e);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
index 0d0ab4544..b655a10cd 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -19,7 +19,6 @@
 package org.apache.inlong.sort.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -63,10 +62,10 @@ import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWat
 import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -74,10 +73,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -90,7 +87,6 @@ import static org.apache.flink.streaming.connectors.kafka.internals.metrics.Kafk
 import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -831,36 +827,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
     @Override
     public void run(SourceContext<T> sourceContext) throws Exception {
-
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(),
-                    auditImp);
-            ThreadSafeCounter recordsInCounter = new ThreadSafeCounter();
-            ThreadSafeCounter bytesInCounter = new ThreadSafeCounter();
-            if (metricState != null) {
-                recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
-                bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
-            }
-            sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
-            sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
-            sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter());
-            sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter());
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
-            if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
-                DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
-                        (DynamicKafkaDeserializationSchema) deserializer;
-                dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
-            }
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+        if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
+            DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
+                    (DynamicKafkaDeserializationSchema) deserializer;
+            dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
         }
 
         if (subscribedPartitionsToStartOffsets == null) {
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 3f0902c0c..d3ca629cb 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -57,10 +57,10 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
@@ -97,7 +97,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -243,18 +242,6 @@ public class FlinkKafkaProducer<IN>
      */
     @Nullable
     protected transient volatile Exception asyncException;
-    /**
-     * audit implement
-     */
-    private transient AuditImp auditImp;
-    /**
-     * inLong groupId
-     */
-    private String inlongGroupId;
-    /**
-     * inLong streamId
-     */
-    private String inlongStreamId;
     /**
      * sink metric data
      */
@@ -914,25 +901,15 @@ public class FlinkKafkaProducer<IN>
                     RuntimeContextInitializationContextAdapters.serializationAdapter(
                             getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
         }
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(),
-                    auditHostAndPorts);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-        if (metricState != null && metricData != null) {
-            metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT));
-            metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT));
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, ctx.getMetricGroup());
         }
         super.open(configuration);
     }
@@ -945,8 +922,7 @@ public class FlinkKafkaProducer<IN>
 
     private void sendDirtyMetrics(Long rowSize, Long dataSize) {
         if (metricData != null) {
-            metricData.getDirtyRecords().inc(rowSize);
-            metricData.getDirtyBytes().inc(dataSize);
+            metricData.invokeDirty(rowSize, dataSize);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index c6b5c11a9..13e3d2103 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -120,7 +120,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(record.value(), collector);
             // output metrics
-            outputMetrics(record);
+            if (metricData != null) {
+                metricData.outputMetrics(1, record.value().length);
+            }
             return;
         }
 
@@ -140,7 +142,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc
         } else {
             valueDeserialization.deserialize(record.value(), outputCollector);
             // output metrics
-            outputMetrics(record);
+            if (metricData != null) {
+                metricData.outputMetrics(1, record.value().length);
+            }
         }
 
         keyCollector.buffer.clear();
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 42bb53732..932e249d5 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -68,9 +68,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (metricData != null) {
-                                    metricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    metricData.outputMetricsWithEstimate(record.value());
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 919c227e4..20f20b65f 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.time.ZoneId;
 import java.util.HashSet;
@@ -226,7 +225,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                         : ZoneId.of(zoneId);
         final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         final String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index e7084d2fa..1ac48f97a 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -25,7 +25,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -47,7 +46,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
@@ -66,9 +66,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -77,7 +75,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -416,23 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            sourceMetricData.registerMetricsForNumRecordsIn();
-            sourceMetricData.registerMetricsForNumBytesIn();
-            sourceMetricData.registerMetricsForNumBytesInForMeter();
-            sourceMetricData.registerMetricsForNumRecordsInForMeter();
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
 
         properties.setProperty("name", "engine");
@@ -473,8 +460,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (sourceMetricData != null) {
-                                    sourceMetricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    sourceMetricData.outputMetricsWithEstimate(record.value());
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index dcb510044..ea102f429 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.mysql.source;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -36,8 +35,8 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.mysql.MySqlValidator;
 import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
@@ -62,12 +61,9 @@ import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.function.Supplier;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables;
 import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
 
@@ -142,28 +138,18 @@ public class MySqlSource<T>
         final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);
         final MySqlSourceReaderMetrics sourceReaderMetrics =
                 new MySqlSourceReaderMetrics(metricGroup);
-        sourceReaderMetrics.registerMetrics();
-        MySqlSourceReaderContext mySqlSourceReaderContext =
-                new MySqlSourceReaderContext(readerContext);
         // create source config for the given subtask (e.g. unique server id)
         MySqlSourceConfig sourceConfig =
                 configFactory.createConfig(readerContext.getIndexOfSubtask());
-        String inlongMetric = sourceConfig.getInlongMetric();
-        String inlongAudit = sourceConfig.getInlongAudit();
-        if (StringUtils.isNotEmpty(inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            sourceReaderMetrics.setInlongGroupId(inlongMetricArray[0]);
-            sourceReaderMetrics.setInlongSteamId(inlongMetricArray[1]);
-            sourceReaderMetrics.setNodeId(inlongMetricArray[2]);
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                sourceReaderMetrics.setAuditImp(AuditImp.getInstance());
-            }
-            sourceReaderMetrics.registerMetricsForNumBytesIn(Constants.NUM_BYTES_IN);
-            sourceReaderMetrics.registerMetricsForNumRecordsIn(Constants.NUM_RECORDS_IN);
-            sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(Constants.NUM_BYTES_IN_PER_SECOND);
-            sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(Constants.NUM_RECORDS_IN_PER_SECOND);
-        }
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(sourceConfig.getInlongMetric())
+                .withInlongAudit(sourceConfig.getInlongAudit())
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        sourceReaderMetrics.registerMetrics(metricOption);
+        MySqlSourceReaderContext mySqlSourceReaderContext =
+                new MySqlSourceReaderContext(readerContext);
+
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
                 new FutureCompletingBlockingQueue<>();
         Supplier<MySqlSplitReader> splitReaderSupplier =
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index b301ed572..45c81e560 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -18,13 +18,10 @@
 
 package org.apache.inlong.sort.cdc.mysql.source.metrics;
 
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
 
 /**
@@ -53,56 +50,21 @@ public class MySqlSourceReaderMetrics {
      */
     private volatile long emitDelay = 0L;
 
-    private Counter numRecordsIn;
-    private Counter numBytesIn;
-    private Meter numRecordsInPerSecond;
-    private Meter numBytesInPerSecond;
-    private static Integer TIME_SPAN_IN_SECONDS = 60;
-    private static String STREAM_ID = "streamId";
-    private static String GROUP_ID = "groupId";
-    private static String NODE_ID = "nodeId";
-    private String inlongGroupId;
-    private String inlongSteamId;
-    private String nodeId;
-    private AuditImp auditImp;
+    private SourceMetricData sourceMetricData;
 
     public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
     }
 
-    public void registerMetrics() {
+    public void registerMetrics(MetricOption metricOption) {
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+        }
         metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay);
         metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay);
         metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
     }
 
-    public void registerMetricsForNumRecordsIn(String metricName) {
-        numRecordsIn =
-                metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                        .addGroup(NODE_ID, this.nodeId)
-                        .counter(metricName);
-    }
-
-    public void registerMetricsForNumBytesIn(String metricName) {
-        numBytesIn =
-                metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                        .addGroup(NODE_ID, this.nodeId)
-                        .counter(metricName);
-    }
-
-    public void registerMetricsForNumRecordsInPerSecond(String metricName) {
-        numRecordsInPerSecond =
-                metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                        .addGroup(NODE_ID, nodeId)
-                        .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
-    }
-
-    public void registerMetricsForNumBytesInPerSecond(String metricName) {
-        numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                .addGroup(NODE_ID, this.nodeId)
-                .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
-    }
-
     public long getFetchDelay() {
         return fetchDelay;
     }
@@ -131,77 +93,9 @@ public class MySqlSourceReaderMetrics {
         this.emitDelay = emitDelay;
     }
 
-    public Counter getNumRecordsIn() {
-        return numRecordsIn;
-    }
-
-    public Counter getNumBytesIn() {
-        return numBytesIn;
-    }
-
-    public Meter getNumRecordsInPerSecond() {
-        return numRecordsInPerSecond;
-    }
-
-    public Meter getNumBytesInPerSecond() {
-        return numBytesInPerSecond;
-    }
-
-    public String getInlongGroupId() {
-        return inlongGroupId;
-    }
-
-    public String getInlongSteamId() {
-        return inlongSteamId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
-    }
-
-    public void setInlongSteamId(String inlongSteamId) {
-        this.inlongSteamId = inlongSteamId;
-    }
-
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public AuditImp getAuditImp() {
-        return auditImp;
-    }
-
-    public void setAuditImp(AuditImp auditImp) {
-        this.auditImp = auditImp;
-    }
-
     public void outputMetrics(long rowCountSize, long rowDataSize) {
-        outputMetricForFlink(rowCountSize, rowDataSize);
-        outputMetricForAudit(rowCountSize, rowDataSize);
-    }
-
-    public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
-        if (this.auditImp != null) {
-            this.auditImp.add(
-                    Constants.AUDIT_SORT_INPUT,
-                    getInlongGroupId(),
-                    getInlongSteamId(),
-                    System.currentTimeMillis(),
-                    rowCountSize,
-                    rowDataSize);
-        }
-    }
-
-    public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
-        if (this.numBytesIn != null) {
-            numBytesIn.inc(rowDataSize);
-        }
-        if (this.numRecordsIn != null) {
-            this.numRecordsIn.inc(rowCountSize);
+        if (sourceMetricData != null) {
+            sourceMetricData.outputMetrics(rowCountSize, rowDataSize);
         }
     }
 }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index c8a70b8bd..b4610d959 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 import org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange;
@@ -124,7 +123,6 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
         final ReadableConfig config = helper.getOptions();
         final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         final String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
         final String hostname = config.get(HOSTNAME);
         final String username = config.get(USERNAME);
         final String password = config.get(PASSWORD);
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 1458693fb..869a0a1cd 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -68,9 +68,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (metricData != null) {
-                                    metricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    metricData.outputMetricsWithEstimate(record.value());
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index 0b4471ae3..d020d03f9 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -114,7 +113,6 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
         return new OracleTableSource(
                 physicalSchema,
                 port,
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index 4ef40bcc8..2ccf92421 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -51,7 +50,6 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -60,7 +58,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
@@ -72,9 +71,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -85,7 +82,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -441,29 +437,15 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            SimpleCounter recordsInCounter = new SimpleCounter();
-            SimpleCounter bytesInCounter = new SimpleCounter();
-            if (metricState != null) {
-                recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
-                bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
-            }
-            sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
-            sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
-            sourceMetricData.registerMetricsForNumRecordsInForMeter();
-            sourceMetricData.registerMetricsForNumBytesInForMeter();
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -504,8 +486,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 deserializer.deserialize(record, out);
                                 if (sourceMetricData != null) {
-                                    sourceMetricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    sourceMetricData.outputMetricsWithEstimate(record.value());
                                 }
                             }
 
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index a886f8aef..02c624290 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -127,7 +126,6 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         return new PostgreSQLTableSource(
                 physicalSchema,
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
deleted file mode 100644
index 18fd19955..000000000
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.inlong.sort.pulsar.table;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-public class Constants {
-
-    public static final ConfigOption<String> INLONG_METRIC =
-        ConfigOptions.key("inlong.metric")
-            .stringType()
-            .defaultValue("")
-            .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
-}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 6d88a303b..bd4ecaaee 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -33,7 +33,8 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
@@ -44,14 +45,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /**
  * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
  */
@@ -76,12 +72,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
     private String inlongMetric;
     private String auditHostAndPorts;
 
-    private AuditImp auditImp;
-
-    private String inlongGroupId;
-
-    private String inlongStreamId;
-
     DynamicPulsarDeserializationSchema(
             int physicalArity,
             @Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -120,23 +110,14 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         }
         valueDeserialization.open(context);
 
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context));
-            sourceMetricData.registerMetricsForNumBytesIn();
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsIn();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context));
         }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
-
     }
 
     /**
@@ -191,7 +172,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
-                sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
+                sourceMetricData.outputMetricsWithEstimate(inputRow);
                 collector.collect(inputRow);
             }));
             return;
@@ -212,7 +193,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             outputCollector.collect(null);
         } else {
             valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
-                sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
+                sourceMetricData.outputMetricsWithEstimate(inputRow);
                 outputCollector.collect(inputRow);
             }));
         }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index bf74dc8d2..5a3ddfe51 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -78,7 +78,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /**
  * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index ff0361b16..e4793dec7 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -68,7 +68,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /**
  * Upsert-Pulsar factory.
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
index c28115846..16ee84362 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.sqlserver.table;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.Validator;
@@ -42,9 +41,7 @@ import io.debezium.heartbeat.Heartbeat;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -54,7 +51,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -76,8 +72,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -217,14 +213,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private SourceMetricData metricData;
 
-    private String inlongGroupId;
-
     private String auditHostAndPorts;
 
-    private String inlongStreamId;
-
-    private transient AuditImp auditImp;
-
     // ---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
@@ -412,20 +402,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, metricGroup);
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsInPerSecond();
-        }
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SourceMetricData(metricOption, metricGroup);
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -464,7 +447,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                         new DebeziumDeserializationSchema<T>() {
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
-                                outputMetrics(record);
+                                if (metricData != null) {
+                                    metricData.outputMetricsWithEstimate(record.value());
+                                }
                                 deserializer.deserialize(record, out);
                             }
 
@@ -502,23 +487,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         debeziumChangeFetcher.runFetchLoop();
     }
 
-    private void outputMetrics(SourceRecord record) {
-        if (metricData != null) {
-            metricData.getNumRecordsIn().inc(1L);
-            metricData.getNumBytesIn()
-                    .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
-        }
-        if (auditImp != null) {
-            auditImp.add(
-                Constants.AUDIT_SORT_INPUT,
-                inlongGroupId,
-                inlongStreamId,
-                System.currentTimeMillis(),
-                1,
-                record.value().toString().getBytes(StandardCharsets.UTF_8).length);
-        }
-    }
-
     @Override
     public void notifyCheckpointComplete(long checkpointId) {
         if (!debeziumStarted) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 3cc166601..53a8e217f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.parser.impl;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.function.EncryptFunction;
@@ -64,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Flink sql parse handler
@@ -149,7 +151,7 @@ public class FlinkSqlParser implements Parser {
         Preconditions.checkNotNull(streamInfo.getRelations(), "relations is null");
         Preconditions.checkState(!streamInfo.getRelations().isEmpty(), "relations is empty");
         log.info("start parse stream, streamId:{}", streamInfo.getStreamId());
-        // Inject the `inlong.metric` for ExtractNode or LoadNode
+        // Inject the metric option for ExtractNode or LoadNode
         injectInlongMetric(streamInfo);
         Map<String, Node> nodeMap = new HashMap<>(streamInfo.getNodes().size());
         streamInfo.getNodes().forEach(s -> {
@@ -169,7 +171,7 @@ public class FlinkSqlParser implements Parser {
     }
 
     /**
-     * Inject the `inlong.metric` for ExtractNode or LoadNode
+     * Inject the metric option for ExtractNode or LoadNode
      *
      * @param streamInfo The encapsulation of nodes and node relations
      */
@@ -183,16 +185,19 @@ public class FlinkSqlParser implements Parser {
                 } else if (node instanceof ExtractNode) {
                     ((ExtractNode) node).setProperties(properties);
                 } else {
-                    throw new UnsupportedOperationException(String.format("Unsupported inlong metric for: %s",
-                            node.getClass().getSimpleName()));
+                    throw new UnsupportedOperationException(String.format(
+                            "Unsupported inlong group stream node for: %s", node.getClass().getSimpleName()));
                 }
             }
-            properties.put(InlongMetric.METRIC_KEY,
-                    String.format(InlongMetric.METRIC_VALUE_FORMAT, groupInfo.getGroupId(),
-                            streamInfo.getStreamId(), node.getId()));
-            if (StringUtils.isNotEmpty(groupInfo.getProperties().get(InlongMetric.AUDIT_KEY))) {
-                properties.put(InlongMetric.AUDIT_KEY,
-                        groupInfo.getProperties().get(InlongMetric.AUDIT_KEY));
+            properties.put(Constants.METRICS_LABELS.key(),
+                    Stream.of(Constants.GROUP_ID + "=" + groupInfo.getGroupId(),
+                                    Constants.STREAM_ID + "=" + streamInfo.getStreamId(),
+                                    Constants.NODE_ID + "=" + node.getId())
+                            .collect(Collectors.joining("&")));
+            // METRICS_AUDIT_PROXY_HOSTS depends on INLONG_GROUP_STREAM_NODE
+            if (StringUtils.isNotEmpty(groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key()))) {
+                properties.put(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
+                        groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key()));
             }
         });
     }