You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/19 11:30:13 UTC

[inlong] branch release-1.3.0 updated (dd34252d9 -> eb00535bf)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from dd34252d9 [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
     new aaf175ba4 [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)
     new eb00535bf [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../inlong/sort/configuration/Constants.java       |   22 +-
 .../apache/inlong/sort/protocol/InlongMetric.java  |   22 +-
 .../org/apache/inlong/sort/base/Constants.java     |   15 +-
 .../apache/inlong/sort/base/metric/MetricData.java |   35 +-
 .../inlong/sort/base/metric/MetricOption.java      |  141 ++-
 .../inlong/sort/base/metric/MetricState.java       |    8 +
 .../inlong/sort/base/metric/SinkMetricData.java    |  180 ++-
 .../inlong/sort/base/metric/SourceMetricData.java  |  105 +-
 .../inlong/sort/base/util/MetricStateUtils.java    |   24 +
 .../sort/base/util/ValidateMetricOptionUtils.java  |   39 -
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |   26 +-
 .../table/RowElasticsearchSinkFunction.java        |   55 +-
 .../sort/filesystem/FileSystemTableSink.java       |    2 -
 .../filesystem/stream/AbstractStreamingWriter.java |   25 +-
 .../sort/hbase/HBase2DynamicTableFactory.java      |    2 -
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  |   46 +-
 .../hive/filesystem/AbstractStreamingWriter.java   |   25 +-
 .../iceberg/flink/FlinkDynamicTableFactory.java    |   11 +-
 .../sort/iceberg/flink/IcebergTableSink.java       |    2 +-
 .../inlong/sort/iceberg/flink/sink/FlinkSink.java  |    2 +-
 .../iceberg/flink/sink/IcebergStreamWriter.java    |    7 -
 .../inlong/sort/iceberg/IcebergTableSink.java      |    2 +-
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |    2 +-
 .../sort/iceberg/sink/IcebergStreamWriter.java     |   25 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |   76 +-
 inlong-sort/sort-connectors/kafka/pom.xml          |    6 +
 .../inlong/sort/kafka/FlinkKafkaConsumer.java      |  352 ++++++
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  | 1330 ++++++++++++++++++++
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |  103 +-
 .../table/DynamicKafkaDeserializationSchema.java   |   69 +-
 .../sort/kafka/table/KafkaDynamicSource.java       |   82 +-
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |   32 +-
 .../mongodb/table/MongoDBTableSourceFactory.java   |    2 -
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |   32 +-
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |   36 +-
 .../source/metrics/MySqlSourceReaderMetrics.java   |  124 +-
 .../mysql/table/MySqlTableInlongSourceFactory.java |    2 -
 .../sort/cdc/oracle/DebeziumSourceFunction.java    |   32 +-
 .../cdc/oracle/table/OracleTableSourceFactory.java |    2 -
 .../DebeziumSourceFunction.java                    |   43 +-
 .../cdc/postgres/table/PostgreSQLTableFactory.java |    2 -
 .../apache/inlong/sort/pulsar/table/Constants.java |   32 -
 .../table/DynamicPulsarDeserializationSchema.java  |   41 +-
 .../pulsar/table/PulsarDynamicTableFactory.java    |    2 +-
 .../table/UpsertPulsarDynamicTableFactory.java     |    2 +-
 .../sqlserver/table/DebeziumSourceFunction.java    |   56 +-
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |   25 +-
 licenses/inlong-sort-connectors/LICENSE            |   12 +
 48 files changed, 2388 insertions(+), 930 deletions(-)
 delete mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
 create mode 100644 inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
 create mode 100644 inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
 delete mode 100644 inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java


[inlong] 02/02: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit eb00535bf0ac9fff42c58aebeb22b05b9b899829
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Mon Sep 19 19:25:03 2022 +0800

    [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive (#5906)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../inlong/sort/configuration/Constants.java       |  22 +++-
 .../apache/inlong/sort/protocol/InlongMetric.java  |  22 +---
 .../org/apache/inlong/sort/base/Constants.java     |  11 +-
 .../apache/inlong/sort/base/metric/MetricData.java |  35 ++++-
 .../inlong/sort/base/metric/MetricOption.java      | 141 ++++++++++++++++-----
 .../inlong/sort/base/metric/SinkMetricData.java    | 128 +++++++++++--------
 .../inlong/sort/base/metric/SourceMetricData.java  | 105 +++++++--------
 .../sort/base/util/ValidateMetricOptionUtils.java  |  39 ------
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  26 ++--
 .../table/RowElasticsearchSinkFunction.java        |  55 ++------
 .../sort/filesystem/FileSystemTableSink.java       |   2 -
 .../filesystem/stream/AbstractStreamingWriter.java |  25 ++--
 .../sort/hbase/HBase2DynamicTableFactory.java      |   2 -
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  |  46 +++----
 .../hive/filesystem/AbstractStreamingWriter.java   |  25 ++--
 .../iceberg/flink/FlinkDynamicTableFactory.java    |  11 +-
 .../sort/iceberg/flink/IcebergTableSink.java       |   2 +-
 .../inlong/sort/iceberg/flink/sink/FlinkSink.java  |   2 +-
 .../iceberg/flink/sink/IcebergStreamWriter.java    |   7 -
 .../inlong/sort/iceberg/IcebergTableSink.java      |   2 +-
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |   2 +-
 .../sort/iceberg/sink/IcebergStreamWriter.java     |  25 ++--
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  76 +++--------
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  |  52 +++-----
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |  48 ++-----
 .../table/DynamicKafkaDeserializationSchema.java   |   8 +-
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |  32 ++---
 .../mongodb/table/MongoDBTableSourceFactory.java   |   2 -
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |  34 ++---
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |  36 ++----
 .../source/metrics/MySqlSourceReaderMetrics.java   | 124 ++----------------
 .../mysql/table/MySqlTableInlongSourceFactory.java |   2 -
 .../sort/cdc/oracle/DebeziumSourceFunction.java    |  32 ++---
 .../cdc/oracle/table/OracleTableSourceFactory.java |   2 -
 .../DebeziumSourceFunction.java                    |  43 ++-----
 .../cdc/postgres/table/PostgreSQLTableFactory.java |   2 -
 .../apache/inlong/sort/pulsar/table/Constants.java |  32 -----
 .../table/DynamicPulsarDeserializationSchema.java  |  41 ++----
 .../pulsar/table/PulsarDynamicTableFactory.java    |   2 +-
 .../table/UpsertPulsarDynamicTableFactory.java     |   2 +-
 .../sqlserver/table/DebeziumSourceFunction.java    |  56 ++------
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |  25 ++--
 42 files changed, 515 insertions(+), 871 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index b0979c62f..e4d6a3906 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -60,6 +60,12 @@ public class Constants {
 
     public static final String HIVE_SINK_ORC_PREFIX = HIVE_SINK_PREFIX + "orc.";
 
+    public static final String GROUP_ID = "groupId";
+
+    public static final String STREAM_ID = "streamId";
+
+    public static final String NODE_ID = "nodeId";
+
     // ------------------------------------------------------------------------
     //  Operator uid
     // ------------------------------------------------------------------------
@@ -275,10 +281,18 @@ public class Constants {
             .defaultValue(5)
             .withDescription("minutes");
 
-    public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS = key("metrics.audit.proxy.hosts")
-            .noDefaultValue()
-            .withDescription("Audit proxy host address for reporting audit metrics. "
-                    + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+    public static final ConfigOption<String> METRICS_LABELS =
+            ConfigOptions.key("inlong.metric.labels")
+                    .noDefaultValue()
+                    .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+                            + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
+
+
+    public static final ConfigOption<String> METRICS_AUDIT_PROXY_HOSTS =
+            ConfigOptions.key("metrics.audit.proxy.hosts")
+                    .noDefaultValue()
+                    .withDescription("Audit proxy host address for reporting audit metrics. \n"
+                            + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
     // ------------------------------------------------------------------------
     //  Single tenant related
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
index 4afb46a89..b90a38ccb 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
@@ -19,29 +19,11 @@ package org.apache.inlong.sort.protocol;
 
 /**
  * The class is the abstract of Inlong Metric.
- * We agree that the key of the inlong metric report is `inlong.metric`,
+ * We agree that the key of the inlong metric report is
+ * {@link org.apache.inlong.sort.configuration.Constants#METRICS_GROUP_STREAM_NODE},
  * and its value is format by `groupId&streamId&nodeId`.
  * If node implements this interface, we will inject the key and value into the corresponding Sort-Connectors
  * during flink sql parser
  */
 public interface InlongMetric {
-
-    /**
-     * The key of metric, it must be `inlong.metric` here.
-     */
-    String METRIC_KEY = "inlong.metric";
-
-    /**
-     * The value format, it must be `groupId&streamId&nodeId` here.
-     * The groupId is the id of Inlong Group
-     * The streamId is the id of Inlong Stream
-     * The nodeId is the id of Inlong Source or Sink
-     */
-    String METRIC_VALUE_FORMAT = "%s&%s&%s";
-
-    /**
-     * The key of InLong audit, the value should be ip:port&ip:port
-     */
-    String AUDIT_KEY = "inlong.audit";
-
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 18ff408f2..679bcc6c3 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -86,17 +86,18 @@ public final class Constants {
     public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states";
 
     public static final ConfigOption<String> INLONG_METRIC =
-        ConfigOptions.key("inlong.metric")
+        ConfigOptions.key("inlong.metric.labels")
             .stringType()
             .noDefaultValue()
-            .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
+            .withDescription("INLONG metric labels, format is 'key1=value1&key2=value2',"
+                    + "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
 
     public static final ConfigOption<String> INLONG_AUDIT =
-        ConfigOptions.key("inlong.audit")
+        ConfigOptions.key("metrics.audit.proxy.hosts")
             .stringType()
             .noDefaultValue()
-            .withDescription("INLONG AUDIT HOST + '&' + PORT");
+            .withDescription("Audit proxy host address for reporting audit metrics. \n"
+                    + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
     public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
             ConfigOptions.key("sink.ignore.changelog")
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
index 681318ff7..381fa5ac4 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 
+import java.util.Map;
+
 import static org.apache.inlong.sort.base.Constants.GROUP_ID;
 import static org.apache.inlong.sort.base.Constants.NODE_ID;
 import static org.apache.inlong.sort.base.Constants.STREAM_ID;
@@ -40,26 +42,39 @@ public interface MetricData {
      */
     MetricGroup getMetricGroup();
 
+    /**
+     * Get labels
+     *
+     * @return The labels defined in inlong
+     */
+    Map<String, String> getLabels();
+
     /**
      * Get group id
      *
      * @return The group id defined in inlong
      */
-    String getGroupId();
+    default String getGroupId() {
+        return getLabels().get(GROUP_ID);
+    }
 
     /**
      * Get stream id
      *
      * @return The stream id defined in inlong
      */
-    String getStreamId();
+    default String getStreamId() {
+        return getLabels().get(STREAM_ID);
+    }
 
     /**
      * Get node id
      *
      * @return The node id defined in inlong
      */
-    String getNodeId();
+    default String getNodeId() {
+        return getLabels().get(NODE_ID);
+    }
 
     /**
      * Register a counter metric
@@ -69,8 +84,11 @@ public interface MetricData {
      * @return Counter of registered
      */
     default Counter registerCounter(String metricName, Counter counter) {
-        return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
-                .addGroup(NODE_ID, getNodeId()).counter(metricName, counter);
+        MetricGroup inlongMetricGroup = getMetricGroup();
+        for (Map.Entry<String, String> label : getLabels().entrySet()) {
+            inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue());
+        }
+        return inlongMetricGroup.counter(metricName, counter);
     }
 
     /**
@@ -90,8 +108,11 @@ public interface MetricData {
      * @return Meter of registered
      */
     default Meter registerMeter(String metricName, Counter counter) {
-        return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
-                .addGroup(NODE_ID, getNodeId()).meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
+        MetricGroup inlongMetricGroup = getMetricGroup();
+        for (Map.Entry<String, String> label : getLabels().entrySet()) {
+            inlongMetricGroup = inlongMetricGroup.addGroup(label.getKey(), label.getValue());
+        }
+        return inlongMetricGroup.meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
     }
 
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index d2179ae54..f4c679f9c 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -19,13 +19,19 @@
 package org.apache.inlong.sort.base.metric;
 
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
+import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
 
 public class MetricOption {
     private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
@@ -35,55 +41,126 @@ public class MetricOption {
             + "3}|65[0-4]\\d{"
             + "2}|655[0-2]\\d|6553[0-5])$";
 
-    private final String groupId;
-    private final String streamId;
-    private final String nodeId;
+    private Map<String, String> labels;
     private final HashSet<String> ipPortList;
-    private String ipPorts;
+    private Optional<String> ipPorts;
+    private RegisteredMetric registeredMetric;
+    private long initRecords;
+    private long initBytes;
 
-    public MetricOption(String inLongMetric) {
-        this(inLongMetric, null);
-    }
+    private MetricOption(
+            String inlongLabels,
+            @Nullable String inlongAudit,
+            RegisteredMetric registeredMetric,
+            long initRecords,
+            long initBytes) {
+        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
+                "Inlong labels must be set for register metric.");
 
-    public MetricOption(String inLongMetric, @Nullable String inLongAudit) {
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inLongMetric, inLongAudit);
-        String[] inLongMetricArray = inLongMetric.split(DELIMITER);
-        Preconditions.checkArgument(inLongMetricArray.length == 3,
-                "Error inLong metric format: " + inLongMetric);
-        this.groupId = inLongMetricArray[0];
-        this.streamId = inLongMetricArray[1];
-        this.nodeId = inLongMetricArray[2];
-        this.ipPortList = new HashSet<>();
-        this.ipPorts = null;
+        this.initRecords = initRecords;
+        this.initBytes = initBytes;
+        this.labels = new LinkedHashMap<>();
+        String[] inLongLabelArray = inlongLabels.split(DELIMITER);
+        Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")),
+                "InLong metric label format must be xxx=xxx");
+        Stream.of(inLongLabelArray).forEach(label -> {
+            String key = label.substring(0, label.indexOf('='));
+            String value = label.substring(label.indexOf('=') + 1);
+            labels.put(key, value);
+        });
 
-        if (inLongAudit != null) {
-            String[] ipPortStrs = inLongAudit.split(DELIMITER);
-            this.ipPorts = inLongAudit;
+        this.ipPortList = new HashSet<>();
+        this.ipPorts = Optional.ofNullable(inlongAudit);
+        if (ipPorts.isPresent()) {
+            Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID),
+                    "groupId and streamId must be set when enable inlong audit collect.");
+            String[] ipPortStrs = inlongAudit.split(DELIMITER);
             for (String ipPort : ipPortStrs) {
                 Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort),
-                        "Error inLong audit format: " + inLongAudit);
+                        "Error inLong audit format: " + inlongAudit);
                 this.ipPortList.add(ipPort);
             }
         }
-    }
 
-    public String getGroupId() {
-        return groupId;
-    }
-
-    public String getStreamId() {
-        return streamId;
+        if (registeredMetric != null) {
+            this.registeredMetric = registeredMetric;
+        }
     }
 
-    public String getNodeId() {
-        return nodeId;
+    public Map<String, String> getLabels() {
+        return labels;
     }
 
     public HashSet<String> getIpPortList() {
         return ipPortList;
     }
 
-    public String getIpPorts() {
+    public Optional<String> getIpPorts() {
         return ipPorts;
     }
+
+    public RegisteredMetric getRegisteredMetric() {
+        return registeredMetric;
+    }
+
+    public long getInitRecords() {
+        return initRecords;
+    }
+
+    public long getInitBytes() {
+        return initBytes;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public enum RegisteredMetric {
+        ALL,
+        NORMAL,
+        DIRTY
+    }
+
+    public static class Builder {
+        private String inlongLabels;
+        private String inlongAudit;
+        private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
+        private long initRecords = 0L;
+        private long initBytes = 0L;
+
+        private Builder() {
+        }
+
+        public MetricOption.Builder withInlongLabels(String inlongLabels) {
+            this.inlongLabels = inlongLabels;
+            return this;
+        }
+
+        public MetricOption.Builder withInlongAudit(String inlongAudit) {
+            this.inlongAudit = inlongAudit;
+            return this;
+        }
+
+        public MetricOption.Builder withRegisterMetric(RegisteredMetric registeredMetric) {
+            this.registeredMetric = registeredMetric;
+            return this;
+        }
+
+        public MetricOption.Builder withInitRecords(long initRecords) {
+            this.initRecords = initRecords;
+            return this;
+        }
+
+        public MetricOption.Builder withInitBytes(long initBytes) {
+            this.initBytes = initBytes;
+            return this;
+        }
+
+        public MetricOption build() {
+            if (inlongLabels == null && inlongAudit == null) {
+                return null;
+            }
+            return new MetricOption(inlongLabels, inlongAudit, registeredMetric, initRecords, initBytes);
+        }
+    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 4073ddd44..d065496e4 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 
-import javax.annotation.Nullable;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Map;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
@@ -45,10 +42,8 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
  */
 public class SinkMetricData implements MetricData {
 
-    private final MetricGroup metricGroup;
-    private final String groupId;
-    private final String streamId;
-    private final String nodeId;
+    private MetricGroup metricGroup;
+    private Map<String, String> labels;
     private AuditImp auditImp;
     private Counter numRecordsOut;
     private Counter numBytesOut;
@@ -59,23 +54,46 @@ public class SinkMetricData implements MetricData {
     private Meter numRecordsOutPerSecond;
     private Meter numBytesOutPerSecond;
 
-    public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, null);
-    }
-
     public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
-    }
-
-    public SinkMetricData(
-            String groupId, String streamId, String nodeId, MetricGroup metricGroup,
-            @Nullable String auditHostAndPorts) {
         this.metricGroup = metricGroup;
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+        this.labels = option.getLabels();
+
+        ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter();
+        ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter();
+        switch (option.getRegisteredMetric()) {
+            case DIRTY:
+                registerMetricsForDirtyBytes(new ThreadSafeCounter());
+                registerMetricsForDirtyRecords(new ThreadSafeCounter());
+                break;
+            case NORMAL:
+                registerMetricsForNumBytesOut(new ThreadSafeCounter());
+                registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+                registerMetricsForNumBytesOutPerSecond();
+                registerMetricsForNumRecordsOutPerSecond();
+
+                recordsOutCounter.inc(option.getInitRecords());
+                bytesOutCounter.inc(option.getInitBytes());
+                registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
+                registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+                break;
+            default:
+                registerMetricsForDirtyBytes(new ThreadSafeCounter());
+                registerMetricsForDirtyRecords(new ThreadSafeCounter());
+                registerMetricsForNumBytesOut(new ThreadSafeCounter());
+                registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+                registerMetricsForNumBytesOutPerSecond();
+                registerMetricsForNumRecordsOutPerSecond();
+
+                recordsOutCounter.inc(option.getInitRecords());
+                bytesOutCounter.inc(option.getInitBytes());
+                registerMetricsForNumRecordsOutForMeter(recordsOutCounter);
+                registerMetricsForNumRecordsOutForMeter(bytesOutCounter);
+                break;
+
+        }
+
+        if (option.getIpPorts().isPresent()) {
+            AuditImp.getInstance().setAuditProxy(option.getIpPortList());
             this.auditImp = AuditImp.getInstance();
         }
     }
@@ -218,18 +236,8 @@ public class SinkMetricData implements MetricData {
     }
 
     @Override
-    public String getGroupId() {
-        return groupId;
-    }
-
-    @Override
-    public String getStreamId() {
-        return streamId;
-    }
-
-    @Override
-    public String getNodeId() {
-        return nodeId;
+    public Map<String, String> getLabels() {
+        return labels;
     }
 
     public Counter getNumRecordsOutForMeter() {
@@ -242,26 +250,26 @@ public class SinkMetricData implements MetricData {
 
     public void invokeWithEstimate(Object o) {
         long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
-        this.numRecordsOut.inc();
-        this.numBytesOut.inc(size);
-        this.numRecordsOutForMeter.inc();
-        this.numBytesOutForMeter.inc(size);
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    getGroupId(),
-                    getStreamId(),
-                    System.currentTimeMillis(),
-                    1,
-                    size);
-        }
+        invoke(1, size);
     }
 
     public void invoke(long rowCount, long rowSize) {
-        this.numRecordsOut.inc(rowCount);
-        this.numBytesOut.inc(rowSize);
-        this.numRecordsOutForMeter.inc(rowCount);
-        this.numBytesOutForMeter.inc(rowSize);
+        if (numRecordsOut != null) {
+            numRecordsOut.inc(rowCount);
+        }
+
+        if (numBytesOut != null) {
+            numBytesOut.inc(rowSize);
+        }
+
+        if (numRecordsOutForMeter != null) {
+            numRecordsOutForMeter.inc(rowCount);
+        }
+
+        if (numBytesOutForMeter != null) {
+            numBytesOutForMeter.inc(rowCount);
+        }
+
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_OUTPUT,
@@ -273,12 +281,22 @@ public class SinkMetricData implements MetricData {
         }
     }
 
+    public void invokeDirty(long rowCount, long rowSize) {
+        if (dirtyRecords != null) {
+            dirtyRecords.inc(rowCount);
+        }
+
+        if (dirtyBytes != null) {
+            dirtyBytes.inc(rowSize);
+        }
+    }
+
     @Override
     public String toString() {
         return "SinkMetricData{"
-                + "groupId='" + groupId + '\''
-                + ", streamId='" + streamId + '\''
-                + ", nodeId='" + nodeId + '\''
+                + "metricGroup=" + metricGroup
+                + ", labels=" + labels
+                + ", auditImp=" + auditImp
                 + ", numRecordsOut=" + numRecordsOut.getCount()
                 + ", numBytesOut=" + numBytesOut.getCount()
                 + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 5c25fcc75..3cffcfe54 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -25,12 +25,9 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 
-import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
-import java.util.Arrays;
-import java.util.HashSet;
-
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
@@ -43,46 +40,39 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
  */
 public class SourceMetricData implements MetricData {
 
-    private final MetricGroup metricGroup;
-    private final String groupId;
-    private final String streamId;
-    private final String nodeId;
+    private MetricGroup metricGroup;
+    private Map<String, String> labels;
     private Counter numRecordsIn;
     private Counter numBytesIn;
     private Counter numRecordsInForMeter;
     private Counter numBytesInForMeter;
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
-    private final AuditImp auditImp;
-
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, (AuditImp) null);
-    }
+    private AuditImp auditImp;
 
     public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
-        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
-    }
-
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
-            AuditImp auditImp) {
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
         this.metricGroup = metricGroup;
-        this.auditImp = auditImp;
-    }
+        this.labels = option.getLabels();
+
+        SimpleCounter recordsInCounter = new SimpleCounter();
+        SimpleCounter bytesInCounter = new SimpleCounter();
+        switch (option.getRegisteredMetric()) {
+            default:
+                registerMetricsForNumRecordsIn();
+                registerMetricsForNumBytesIn();
+                registerMetricsForNumBytesInPerSecond();
+                registerMetricsForNumRecordsInPerSecond();
+
+                recordsInCounter.inc(option.getInitRecords());
+                bytesInCounter.inc(option.getInitBytes());
+                registerMetricsForNumBytesInForMeter(recordsInCounter);
+                registerMetricsForNumRecordsInForMeter(bytesInCounter);
+                break;
+        }
 
-    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
-            @Nullable String auditHostAndPorts) {
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
-        this.metricGroup = metricGroup;
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+        if (option.getIpPorts() != null) {
+            AuditImp.getInstance().setAuditProxy(option.getIpPortList());
             this.auditImp = AuditImp.getInstance();
-        } else {
-            this.auditImp = null;
         }
     }
 
@@ -196,26 +186,32 @@ public class SourceMetricData implements MetricData {
     }
 
     @Override
-    public String getGroupId() {
-        return groupId;
-    }
-
-    @Override
-    public String getStreamId() {
-        return streamId;
+    public Map<String, String> getLabels() {
+        return labels;
     }
 
-    @Override
-    public String getNodeId() {
-        return nodeId;
+    public void outputMetricsWithEstimate(Object o) {
+        long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
+        outputMetrics(1, size);
     }
 
     public void outputMetrics(long rowCountSize, long rowDataSize) {
-        outputMetricForFlink(rowCountSize, rowDataSize);
-        outputMetricForAudit(rowCountSize, rowDataSize);
-    }
+        if (numRecordsIn != null) {
+            this.numRecordsIn.inc(rowCountSize);
+        }
+
+        if (numBytesIn != null) {
+            this.numBytesIn.inc(rowDataSize);
+        }
+
+        if (numRecordsInForMeter != null) {
+            this.numRecordsInForMeter.inc(rowCountSize);
+        }
+
+        if (numBytesInForMeter != null) {
+            this.numBytesInForMeter.inc(rowDataSize);
+        }
 
-    public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_INPUT,
@@ -227,25 +223,18 @@ public class SourceMetricData implements MetricData {
         }
     }
 
-    public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
-        this.numBytesIn.inc(rowDataSize);
-        this.numRecordsIn.inc(rowCountSize);
-        this.numBytesInForMeter.inc(rowDataSize);
-        this.numRecordsInForMeter.inc(rowCountSize);
-    }
-
     @Override
     public String toString() {
         return "SourceMetricData{"
-                + "groupId='" + groupId + '\''
-                + ", streamId='" + streamId + '\''
-                + ", nodeId='" + nodeId + '\''
+                + "metricGroup=" + metricGroup
+                + ", labels=" + labels
                 + ", numRecordsIn=" + numRecordsIn.getCount()
                 + ", numBytesIn=" + numBytesIn.getCount()
                 + ", numRecordsInForMeter=" + numRecordsInForMeter.getCount()
                 + ", numBytesInForMeter=" + numBytesInForMeter.getCount()
                 + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
                 + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
+                + ", auditImp=" + auditImp
                 + '}';
     }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
deleted file mode 100644
index bd58e31ae..000000000
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.inlong.sort.base.util;
-
-import org.apache.flink.table.api.ValidationException;
-
-/**
- * validate option tool
- */
-public class ValidateMetricOptionUtils {
-
-    /**
-     * validate inlong metric when set inlong audit
-     * @param inlongMetric inlong.metric option value
-     * @param inlongAudit inlong.audit option value
-     */
-    public static void validateInlongMetricIfSetInlongAudit(String inlongMetric, String inlongAudit) {
-        if (inlongAudit != null && inlongMetric == null) {
-            throw new ValidationException("inlong metric is necessary when set inlong audit");
-        }
-    }
-
-}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 000e1c23a..cdea07fa5 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.DocWriteRequest;
@@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * Base class for all Flink Elasticsearch Sinks.
@@ -266,15 +267,14 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
     @Override
     public void open(Configuration parameters) throws Exception {
         client = callBridge.createClient(userConfig);
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
-            sinkMetricData.registerMetricsForDirtyBytes();
-            sinkMetricData.registerMetricsForDirtyRecords();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withRegisterMetric(RegisteredMetric.DIRTY)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
+
         callBridge.verifyClientConnection(client);
         bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
         requestIndexer =
@@ -482,8 +482,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
                         if (failure != null) {
                             restStatus = itemResponse.getFailure().getStatus();
                             actionRequest = request.requests().get(i);
-                            if (sinkMetricData.getDirtyRecords() != null) {
-                                sinkMetricData.getDirtyRecords().inc();
+                            if (sinkMetricData != null) {
+                                sinkMetricData.invokeDirty(1, 0);
                             }
                             if (restStatus == null) {
                                 if (actionRequest instanceof ActionRequest) {
@@ -526,8 +526,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
         public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
             try {
                 for (DocWriteRequest writeRequest : request.requests()) {
-                    if (sinkMetricData.getDirtyRecords() != null) {
-                        sinkMetricData.getDirtyRecords().inc();
+                    if (sinkMetricData != null) {
+                        sinkMetricData.invokeDirty(1, 0);
                     }
                     if (writeRequest instanceof ActionRequest) {
                         failureHandler.onFailure(
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 0ae93231d..e4f1419c5 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,8 +20,8 @@ package org.apache.inlong.sort.elasticsearch.table;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.table.api.TableException;
@@ -37,13 +37,9 @@ import org.elasticsearch.common.xcontent.XContentType;
 
 import javax.annotation.Nullable;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Objects;
 import java.util.function.Function;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
 public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
 
@@ -63,11 +59,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     private transient  RuntimeContext runtimeContext;
 
     private SinkMetricData sinkMetricData;
-    private Long dataSize = 0L;
-    private Long rowSize = 0L;
-    private String groupId;
-    private String streamId;
-    private transient AuditImp auditImp;
 
     public RowElasticsearchSinkFunction(
             IndexGenerator indexGenerator,
@@ -94,44 +85,20 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            groupId = inlongMetricArray[0];
-            streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
-    }
-
-    private void outputMetricForAudit(long size) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    groupId,
-                    streamId,
-                    System.currentTimeMillis(),
-                    1,
-                    size);
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.NORMAL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
         }
     }
 
     private void sendMetrics(byte[] document) {
-        if (sinkMetricData.getNumBytesOut() != null) {
-            sinkMetricData.getNumBytesOut().inc(document.length);
-        }
-        if (sinkMetricData.getNumRecordsOut() != null) {
-            sinkMetricData.getNumRecordsOut().inc();
+        if (sinkMetricData != null) {
+            sinkMetricData.invoke(1, document.length);
         }
-        outputMetricForAudit(document.length);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
index 60bcf9332..b18e7e32d 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
@@ -77,7 +77,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 import org.apache.inlong.sort.filesystem.stream.StreamingSink;
 
 import javax.annotation.Nullable;
@@ -157,7 +156,6 @@ public class FileSystemTableSink extends AbstractFileSystemTable
         this.configuredParallelism = tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
         this.inlongMetric = tableOptions.get(INLONG_METRIC);
         this.inlongAudit = tableOptions.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 47516a270..95267c698 100644
--- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -32,10 +32,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
-
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
@@ -102,19 +101,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
     @Override
     public void open() throws Exception {
         super.open();
-        if (inlongMetric != null) {
-            String[] inLongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inLongMetricArray[0];
-            String streamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
-            metricData = new SinkMetricData(
-                    groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(), inlongAudit);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .withInlongAudit(inlongAudit)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
index da14c947a..55d5d57d7 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
 
 import java.util.HashSet;
@@ -106,7 +105,6 @@ public class HBase2DynamicTableFactory
         HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
         String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = tableOptions.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         return new HBaseDynamicTableSink(
                 tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, inlongMetric, inlongAudit);
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 907b758ff..a1e9641d2 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -37,9 +37,9 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,19 +123,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
         org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
         try {
             this.runtimeContext = getRuntimeContext();
-            if (inlongMetric != null && !inlongMetric.isEmpty()) {
-                String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
-                String groupId = inlongMetricArray[0];
-                String streamId = inlongMetricArray[1];
-                String nodeId = inlongMetricArray[2];
-                sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup(),
-                        inlongAudit);
-                sinkMetricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-                sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
+            MetricOption metricOption = MetricOption.builder()
+                    .withInlongLabels(inlongMetric)
+                    .withInlongAudit(inlongAudit)
+                    .withRegisterMetric(RegisteredMetric.ALL)
+                    .build();
+            if (metricOption != null) {
+                sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
             }
             this.mutationConverter.open();
             this.numPendingRequests = new AtomicLong(0);
@@ -163,14 +157,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
                                     }
                                     try {
                                         flush();
-                                        sinkMetricData.invoke(rowSize, dataSize);
+                                        if (sinkMetricData != null) {
+                                            sinkMetricData.invoke(rowSize, dataSize);
+                                        }
                                         resetStateAfterFlush();
                                     } catch (Exception e) {
-                                        if (sinkMetricData.getDirtyRecords() != null) {
-                                            sinkMetricData.getDirtyRecords().inc(rowSize);
-                                        }
-                                        if (sinkMetricData.getDirtyBytes() != null) {
-                                            sinkMetricData.getDirtyBytes().inc(dataSize);
+                                        if (sinkMetricData != null) {
+                                            sinkMetricData.invokeDirty(rowSize, dataSize);
                                         }
                                         resetStateAfterFlush();
                                         // fail the sink and skip the rest of the items
@@ -236,14 +229,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
                 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
             try {
                 flush();
-                sinkMetricData.invoke(rowSize, dataSize);
+                if (sinkMetricData != null) {
+                    sinkMetricData.invoke(rowSize, dataSize);
+                }
                 resetStateAfterFlush();
             } catch (Exception e) {
-                if (sinkMetricData.getDirtyRecords() != null) {
-                    sinkMetricData.getDirtyRecords().inc(rowSize);
-                }
-                if (sinkMetricData.getDirtyBytes() != null) {
-                    sinkMetricData.getDirtyBytes().inc(dataSize);
+                if (sinkMetricData != null) {
+                    sinkMetricData.invokeDirty(rowSize, dataSize);
                 }
                 resetStateAfterFlush();
                 throw e;
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 5d0ca7bf0..ab2e845f2 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -32,12 +32,11 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import javax.annotation.Nullable;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /**
  * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
  * file and bucket information to downstream.
@@ -111,19 +110,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
     @Override
     public void open() throws Exception {
         super.open();
-        if (inlongMetric != null) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String inlongGroupId = inlongMetricArray[0];
-            String inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(
-                    inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
index 35d3a6c76..7840e3ea7 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
@@ -135,12 +135,11 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         Map<String, String> tableProps = catalogTable.getOptions();
         TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
         SyncRewriteDataFilesActionOption compactOption = new SyncRewriteDataFilesActionOption(tableProps);
-        MetricOption metricOption = null;
-        if (tableProps.containsKey(INLONG_METRIC.key())) {
-            metricOption = new MetricOption(
-                    tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
-                    tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()));
-        }
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()))
+                .withInlongAudit(tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
+                .build();
+
         boolean appendMode = tableProps.containsKey(ICEBERG_IGNORE_ALL_CHANGELOG.key())
                 ? Boolean.parseBoolean(tableProps.get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
                 : ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue();
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
index 112b6a36b..df0ced054 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
@@ -43,7 +43,7 @@ import java.util.Map;
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
  * Add a table property `write.compact.enable` to support small file compact.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit
  */
 public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
     private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
index 0b48fb169..c24d5a8c6 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
@@ -74,7 +74,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
  * Add a table property `write.compact.enable` to support small file compact.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit.
  */
 public class FlinkSink {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index 9f7cffbcf..bb00b7808 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -76,12 +75,6 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         // Initialize metric
         if (metricOption != null) {
             metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
         }
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 088622278..37eb2670b 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -46,7 +46,7 @@ import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_A
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit
  */
 public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 65929bc34..8662722d8 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -71,7 +71,7 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
- * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ * Add option `inlong.metric` and `metrics.audit.proxy.hosts` to support collect inlong metrics and audit.
  */
 public class FlinkSink {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 79df9049f..8318c7177 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -28,14 +28,13 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -79,19 +78,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         this.writer = taskWriterFactory.create();
 
         // Initialize metric
-        if (inlongMetric != null) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String inlongGroupId = inlongMetricArray[0];
-            String inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(
-                    inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
     }
 
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 66af78c4d..e32d4094b 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -33,7 +33,8 @@ import org.apache.flink.connector.jdbc.utils.JdbcUtils;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,9 +44,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -53,8 +52,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * A JDBC outputFormat that supports batching records before writing records to database.
@@ -80,9 +77,6 @@ public class JdbcBatchingOutputFormat<
     private transient RuntimeContext runtimeContext;
 
     private SinkMetricData sinkMetricData;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private transient AuditImp auditImp;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
 
@@ -130,22 +124,13 @@ public class JdbcBatchingOutputFormat<
     public void open(int taskNumber, int numTasks) throws IOException {
         super.open(taskNumber, numTasks);
         this.runtimeContext = getRuntimeContext();
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sinkMetricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForDirtyBytes();
-            sinkMetricData.registerMetricsForDirtyRecords();
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, runtimeContext.getMetricGroup());
         }
         jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
         if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -159,20 +144,13 @@ public class JdbcBatchingOutputFormat<
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (sinkMetricData.getNumRecordsOut() != null) {
-                                                sinkMetricData.getNumRecordsOut().inc(rowSize);
-                                            }
-                                            if (sinkMetricData.getNumBytesOut() != null) {
-                                                sinkMetricData.getNumBytesOut()
-                                                        .inc(dataSize);
+                                            if (sinkMetricData != null) {
+                                                sinkMetricData.invoke(rowSize, dataSize);
                                             }
                                             resetStateAfterFlush();
                                         } catch (Exception e) {
-                                            if (sinkMetricData.getDirtyRecords() != null) {
-                                                sinkMetricData.getDirtyRecords().inc(rowSize);
-                                            }
-                                            if (sinkMetricData.getDirtyBytes() != null) {
-                                                sinkMetricData.getDirtyBytes().inc(dataSize);
+                                            if (sinkMetricData != null) {
+                                                sinkMetricData.invokeDirty(rowSize, dataSize);
                                             }
                                             resetStateAfterFlush();
                                             flushException = e;
@@ -203,46 +181,26 @@ public class JdbcBatchingOutputFormat<
         }
     }
 
-    private void outputMetricForAudit(long length) {
-        if (auditImp != null) {
-            auditImp.add(
-                    AUDIT_SORT_INPUT,
-                    inlongGroupId,
-                    inlongStreamId,
-                    System.currentTimeMillis(),
-                    1,
-                    length);
-        }
-    }
-
     @Override
     public final synchronized void writeRecord(In record) throws IOException {
         checkFlushException();
 
         rowSize++;
         dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
-        outputMetricForAudit(dataSize);
         try {
             addToBatch(record, jdbcRecordExtractor.apply(record));
             batchCount++;
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (sinkMetricData.getNumRecordsOut() != null) {
-                    sinkMetricData.getNumRecordsOut().inc(rowSize);
-                }
-                if (sinkMetricData.getNumBytesOut() != null) {
-                    sinkMetricData.getNumBytesOut()
-                            .inc(dataSize);
+                if (sinkMetricData != null) {
+                    sinkMetricData.invoke(rowSize, dataSize);
                 }
                 resetStateAfterFlush();
             }
         } catch (Exception e) {
-            if (sinkMetricData.getDirtyRecords() != null) {
-                sinkMetricData.getDirtyRecords().inc(rowSize);
-            }
-            if (sinkMetricData.getDirtyBytes() != null) {
-                sinkMetricData.getDirtyBytes().inc(dataSize);
+            if (sinkMetricData != null) {
+                sinkMetricData.invokeDirty(rowSize, dataSize);
             }
             resetStateAfterFlush();
             throw new IOException("Writing records to JDBC failed.", e);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
index 0d0ab4544..b655a10cd 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -19,7 +19,6 @@
 package org.apache.inlong.sort.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -63,10 +62,10 @@ import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWat
 import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -74,10 +73,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -90,7 +87,6 @@ import static org.apache.flink.streaming.connectors.kafka.internals.metrics.Kafk
 import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -831,36 +827,20 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
     @Override
     public void run(SourceContext<T> sourceContext) throws Exception {
-
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(),
-                    auditImp);
-            ThreadSafeCounter recordsInCounter = new ThreadSafeCounter();
-            ThreadSafeCounter bytesInCounter = new ThreadSafeCounter();
-            if (metricState != null) {
-                recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
-                bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
-            }
-            sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
-            sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
-            sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter());
-            sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter());
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
-            if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
-                DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
-                        (DynamicKafkaDeserializationSchema) deserializer;
-                dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
-            }
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+        if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
+            DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
+                    (DynamicKafkaDeserializationSchema) deserializer;
+            dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
         }
 
         if (subscribedPartitionsToStartOffsets == null) {
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 3f0902c0c..d3ca629cb 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -57,10 +57,10 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
@@ -97,7 +97,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -243,18 +242,6 @@ public class FlinkKafkaProducer<IN>
      */
     @Nullable
     protected transient volatile Exception asyncException;
-    /**
-     * audit implement
-     */
-    private transient AuditImp auditImp;
-    /**
-     * inLong groupId
-     */
-    private String inlongGroupId;
-    /**
-     * inLong streamId
-     */
-    private String inlongStreamId;
     /**
      * sink metric data
      */
@@ -914,25 +901,15 @@ public class FlinkKafkaProducer<IN>
                     RuntimeContextInitializationContextAdapters.serializationAdapter(
                             getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
         }
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(),
-                    auditHostAndPorts);
-            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond();
-            metricData.registerMetricsForNumRecordsOutPerSecond();
-        }
-        if (metricState != null && metricData != null) {
-            metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT));
-            metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT));
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, ctx.getMetricGroup());
         }
         super.open(configuration);
     }
@@ -945,8 +922,7 @@ public class FlinkKafkaProducer<IN>
 
     private void sendDirtyMetrics(Long rowSize, Long dataSize) {
         if (metricData != null) {
-            metricData.getDirtyRecords().inc(rowSize);
-            metricData.getDirtyBytes().inc(dataSize);
+            metricData.invokeDirty(rowSize, dataSize);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index c6b5c11a9..13e3d2103 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -120,7 +120,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(record.value(), collector);
             // output metrics
-            outputMetrics(record);
+            if (metricData != null) {
+                metricData.outputMetrics(1, record.value().length);
+            }
             return;
         }
 
@@ -140,7 +142,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc
         } else {
             valueDeserialization.deserialize(record.value(), outputCollector);
             // output metrics
-            outputMetrics(record);
+            if (metricData != null) {
+                metricData.outputMetrics(1, record.value().length);
+            }
         }
 
         keyCollector.buffer.clear();
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 42bb53732..932e249d5 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -68,9 +68,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (metricData != null) {
-                                    metricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    metricData.outputMetricsWithEstimate(record.value());
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 919c227e4..20f20b65f 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.time.ZoneId;
 import java.util.HashSet;
@@ -226,7 +225,6 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                         : ZoneId.of(zoneId);
         final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         final String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index e7084d2fa..1ac48f97a 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -25,7 +25,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -47,7 +46,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
@@ -66,9 +66,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -77,7 +75,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -416,23 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            sourceMetricData.registerMetricsForNumRecordsIn();
-            sourceMetricData.registerMetricsForNumBytesIn();
-            sourceMetricData.registerMetricsForNumBytesInForMeter();
-            sourceMetricData.registerMetricsForNumRecordsInForMeter();
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
 
         properties.setProperty("name", "engine");
@@ -473,8 +460,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (sourceMetricData != null) {
-                                    sourceMetricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    sourceMetricData.outputMetricsWithEstimate(record.value());
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index dcb510044..ea102f429 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.mysql.source;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -36,8 +35,8 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.mysql.MySqlValidator;
 import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
@@ -62,12 +61,9 @@ import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.function.Supplier;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables;
 import static org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
 
@@ -142,28 +138,18 @@ public class MySqlSource<T>
         final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);
         final MySqlSourceReaderMetrics sourceReaderMetrics =
                 new MySqlSourceReaderMetrics(metricGroup);
-        sourceReaderMetrics.registerMetrics();
-        MySqlSourceReaderContext mySqlSourceReaderContext =
-                new MySqlSourceReaderContext(readerContext);
         // create source config for the given subtask (e.g. unique server id)
         MySqlSourceConfig sourceConfig =
                 configFactory.createConfig(readerContext.getIndexOfSubtask());
-        String inlongMetric = sourceConfig.getInlongMetric();
-        String inlongAudit = sourceConfig.getInlongAudit();
-        if (StringUtils.isNotEmpty(inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            sourceReaderMetrics.setInlongGroupId(inlongMetricArray[0]);
-            sourceReaderMetrics.setInlongSteamId(inlongMetricArray[1]);
-            sourceReaderMetrics.setNodeId(inlongMetricArray[2]);
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                sourceReaderMetrics.setAuditImp(AuditImp.getInstance());
-            }
-            sourceReaderMetrics.registerMetricsForNumBytesIn(Constants.NUM_BYTES_IN);
-            sourceReaderMetrics.registerMetricsForNumRecordsIn(Constants.NUM_RECORDS_IN);
-            sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(Constants.NUM_BYTES_IN_PER_SECOND);
-            sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(Constants.NUM_RECORDS_IN_PER_SECOND);
-        }
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(sourceConfig.getInlongMetric())
+                .withInlongAudit(sourceConfig.getInlongAudit())
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        sourceReaderMetrics.registerMetrics(metricOption);
+        MySqlSourceReaderContext mySqlSourceReaderContext =
+                new MySqlSourceReaderContext(readerContext);
+
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
                 new FutureCompletingBlockingQueue<>();
         Supplier<MySqlSplitReader> splitReaderSupplier =
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index b301ed572..45c81e560 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -18,13 +18,10 @@
 
 package org.apache.inlong.sort.cdc.mysql.source.metrics;
 
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
 
 /**
@@ -53,56 +50,21 @@ public class MySqlSourceReaderMetrics {
      */
     private volatile long emitDelay = 0L;
 
-    private Counter numRecordsIn;
-    private Counter numBytesIn;
-    private Meter numRecordsInPerSecond;
-    private Meter numBytesInPerSecond;
-    private static Integer TIME_SPAN_IN_SECONDS = 60;
-    private static String STREAM_ID = "streamId";
-    private static String GROUP_ID = "groupId";
-    private static String NODE_ID = "nodeId";
-    private String inlongGroupId;
-    private String inlongSteamId;
-    private String nodeId;
-    private AuditImp auditImp;
+    private SourceMetricData sourceMetricData;
 
     public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
     }
 
-    public void registerMetrics() {
+    public void registerMetrics(MetricOption metricOption) {
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+        }
         metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay);
         metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay);
         metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
     }
 
-    public void registerMetricsForNumRecordsIn(String metricName) {
-        numRecordsIn =
-                metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                        .addGroup(NODE_ID, this.nodeId)
-                        .counter(metricName);
-    }
-
-    public void registerMetricsForNumBytesIn(String metricName) {
-        numBytesIn =
-                metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                        .addGroup(NODE_ID, this.nodeId)
-                        .counter(metricName);
-    }
-
-    public void registerMetricsForNumRecordsInPerSecond(String metricName) {
-        numRecordsInPerSecond =
-                metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                        .addGroup(NODE_ID, nodeId)
-                        .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
-    }
-
-    public void registerMetricsForNumBytesInPerSecond(String metricName) {
-        numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
-                .addGroup(NODE_ID, this.nodeId)
-                .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
-    }
-
     public long getFetchDelay() {
         return fetchDelay;
     }
@@ -131,77 +93,9 @@ public class MySqlSourceReaderMetrics {
         this.emitDelay = emitDelay;
     }
 
-    public Counter getNumRecordsIn() {
-        return numRecordsIn;
-    }
-
-    public Counter getNumBytesIn() {
-        return numBytesIn;
-    }
-
-    public Meter getNumRecordsInPerSecond() {
-        return numRecordsInPerSecond;
-    }
-
-    public Meter getNumBytesInPerSecond() {
-        return numBytesInPerSecond;
-    }
-
-    public String getInlongGroupId() {
-        return inlongGroupId;
-    }
-
-    public String getInlongSteamId() {
-        return inlongSteamId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
-    }
-
-    public void setInlongSteamId(String inlongSteamId) {
-        this.inlongSteamId = inlongSteamId;
-    }
-
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public AuditImp getAuditImp() {
-        return auditImp;
-    }
-
-    public void setAuditImp(AuditImp auditImp) {
-        this.auditImp = auditImp;
-    }
-
     public void outputMetrics(long rowCountSize, long rowDataSize) {
-        outputMetricForFlink(rowCountSize, rowDataSize);
-        outputMetricForAudit(rowCountSize, rowDataSize);
-    }
-
-    public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
-        if (this.auditImp != null) {
-            this.auditImp.add(
-                    Constants.AUDIT_SORT_INPUT,
-                    getInlongGroupId(),
-                    getInlongSteamId(),
-                    System.currentTimeMillis(),
-                    rowCountSize,
-                    rowDataSize);
-        }
-    }
-
-    public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
-        if (this.numBytesIn != null) {
-            numBytesIn.inc(rowDataSize);
-        }
-        if (this.numRecordsIn != null) {
-            this.numRecordsIn.inc(rowCountSize);
+        if (sourceMetricData != null) {
+            sourceMetricData.outputMetrics(rowCountSize, rowDataSize);
         }
     }
 }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index c8a70b8bd..b4610d959 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 import org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
 import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange;
@@ -124,7 +123,6 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
         final ReadableConfig config = helper.getOptions();
         final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         final String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
         final String hostname = config.get(HOSTNAME);
         final String username = config.get(USERNAME);
         final String password = config.get(PASSWORD);
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 1458693fb..869a0a1cd 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -58,7 +57,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -68,9 +68,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,7 +79,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -416,21 +413,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -470,8 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 if (metricData != null) {
-                                    metricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    metricData.outputMetricsWithEstimate(record.value());
                                 }
                                 deserializer.deserialize(record, out);
                             }
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index 0b4471ae3..d020d03f9 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -114,7 +113,6 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
         return new OracleTableSource(
                 physicalSchema,
                 port,
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index 4ef40bcc8..2ccf92421 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -36,7 +36,6 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -51,7 +50,6 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -60,7 +58,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
@@ -72,9 +71,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -85,7 +82,6 @@ import java.util.concurrent.TimeUnit;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -441,29 +437,15 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String groupId = inlongMetricArray[0];
-            String streamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            AuditImp auditImp = null;
-            if (inlongAudit != null) {
-                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
-                auditImp = AuditImp.getInstance();
-            }
-            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
-            SimpleCounter recordsInCounter = new SimpleCounter();
-            SimpleCounter bytesInCounter = new SimpleCounter();
-            if (metricState != null) {
-                recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
-                bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
-            }
-            sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
-            sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
-            sourceMetricData.registerMetricsForNumRecordsInForMeter();
-            sourceMetricData.registerMetricsForNumBytesInForMeter();
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(inlongAudit)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup());
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -504,8 +486,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                 deserializer.deserialize(record, out);
                                 if (sourceMetricData != null) {
-                                    sourceMetricData.outputMetrics(1L,
-                                            record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                    sourceMetricData.outputMetricsWithEstimate(record.value());
                                 }
                             }
 
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index a886f8aef..02c624290 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -127,7 +126,6 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = config.get(INLONG_AUDIT);
-        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
         return new PostgreSQLTableSource(
                 physicalSchema,
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
deleted file mode 100644
index 18fd19955..000000000
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.inlong.sort.pulsar.table;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-public class Constants {
-
-    public static final ConfigOption<String> INLONG_METRIC =
-        ConfigOptions.key("inlong.metric")
-            .stringType()
-            .defaultValue("")
-            .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
-}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 6d88a303b..bd4ecaaee 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -33,7 +33,8 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
@@ -44,14 +45,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
 /**
  * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
  */
@@ -76,12 +72,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
     private String inlongMetric;
     private String auditHostAndPorts;
 
-    private AuditImp auditImp;
-
-    private String inlongGroupId;
-
-    private String inlongStreamId;
-
     DynamicPulsarDeserializationSchema(
             int physicalArity,
             @Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -120,23 +110,14 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         }
         valueDeserialization.open(context);
 
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context));
-            sourceMetricData.registerMetricsForNumBytesIn();
-            sourceMetricData.registerMetricsForNumBytesInPerSecond();
-            sourceMetricData.registerMetricsForNumRecordsIn();
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context));
         }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
-
     }
 
     /**
@@ -191,7 +172,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
-                sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
+                sourceMetricData.outputMetricsWithEstimate(inputRow);
                 collector.collect(inputRow);
             }));
             return;
@@ -212,7 +193,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             outputCollector.collect(null);
         } else {
             valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> {
-                sourceMetricData.outputMetrics(1L, inputRow.toString().getBytes(StandardCharsets.UTF_8).length);
+                sourceMetricData.outputMetricsWithEstimate(inputRow);
                 outputCollector.collect(inputRow);
             }));
         }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index bf74dc8d2..5a3ddfe51 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -78,7 +78,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /**
  * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index ff0361b16..e4793dec7 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -68,7 +68,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /**
  * Upsert-Pulsar factory.
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
index c28115846..16ee84362 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.cdc.sqlserver.table;
 
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.Validator;
@@ -42,9 +41,7 @@ import io.debezium.heartbeat.Heartbeat;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -54,7 +51,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -76,8 +72,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -217,14 +213,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private SourceMetricData metricData;
 
-    private String inlongGroupId;
-
     private String auditHostAndPorts;
 
-    private String inlongStreamId;
-
-    private transient AuditImp auditImp;
-
     // ---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
@@ -412,20 +402,13 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                 (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
-        if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, metricGroup);
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsInPerSecond();
-        }
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SourceMetricData(metricOption, metricGroup);
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
@@ -464,7 +447,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
                         new DebeziumDeserializationSchema<T>() {
                             @Override
                             public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
-                                outputMetrics(record);
+                                if (metricData != null) {
+                                    metricData.outputMetricsWithEstimate(record.value());
+                                }
                                 deserializer.deserialize(record, out);
                             }
 
@@ -502,23 +487,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         debeziumChangeFetcher.runFetchLoop();
     }
 
-    private void outputMetrics(SourceRecord record) {
-        if (metricData != null) {
-            metricData.getNumRecordsIn().inc(1L);
-            metricData.getNumBytesIn()
-                    .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
-        }
-        if (auditImp != null) {
-            auditImp.add(
-                Constants.AUDIT_SORT_INPUT,
-                inlongGroupId,
-                inlongStreamId,
-                System.currentTimeMillis(),
-                1,
-                record.value().toString().getBytes(StandardCharsets.UTF_8).length);
-        }
-    }
-
     @Override
     public void notifyCheckpointComplete(long checkpointId) {
         if (!debeziumStarted) {
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 3cc166601..53a8e217f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.parser.impl;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.function.EncryptFunction;
@@ -64,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Flink sql parse handler
@@ -149,7 +151,7 @@ public class FlinkSqlParser implements Parser {
         Preconditions.checkNotNull(streamInfo.getRelations(), "relations is null");
         Preconditions.checkState(!streamInfo.getRelations().isEmpty(), "relations is empty");
         log.info("start parse stream, streamId:{}", streamInfo.getStreamId());
-        // Inject the `inlong.metric` for ExtractNode or LoadNode
+        // Inject the metric option for ExtractNode or LoadNode
         injectInlongMetric(streamInfo);
         Map<String, Node> nodeMap = new HashMap<>(streamInfo.getNodes().size());
         streamInfo.getNodes().forEach(s -> {
@@ -169,7 +171,7 @@ public class FlinkSqlParser implements Parser {
     }
 
     /**
-     * Inject the `inlong.metric` for ExtractNode or LoadNode
+     * Inject the metric option for ExtractNode or LoadNode
      *
      * @param streamInfo The encapsulation of nodes and node relations
      */
@@ -183,16 +185,19 @@ public class FlinkSqlParser implements Parser {
                 } else if (node instanceof ExtractNode) {
                     ((ExtractNode) node).setProperties(properties);
                 } else {
-                    throw new UnsupportedOperationException(String.format("Unsupported inlong metric for: %s",
-                            node.getClass().getSimpleName()));
+                    throw new UnsupportedOperationException(String.format(
+                            "Unsupported inlong group stream node for: %s", node.getClass().getSimpleName()));
                 }
             }
-            properties.put(InlongMetric.METRIC_KEY,
-                    String.format(InlongMetric.METRIC_VALUE_FORMAT, groupInfo.getGroupId(),
-                            streamInfo.getStreamId(), node.getId()));
-            if (StringUtils.isNotEmpty(groupInfo.getProperties().get(InlongMetric.AUDIT_KEY))) {
-                properties.put(InlongMetric.AUDIT_KEY,
-                        groupInfo.getProperties().get(InlongMetric.AUDIT_KEY));
+            properties.put(Constants.METRICS_LABELS.key(),
+                    Stream.of(Constants.GROUP_ID + "=" + groupInfo.getGroupId(),
+                                    Constants.STREAM_ID + "=" + streamInfo.getStreamId(),
+                                    Constants.NODE_ID + "=" + node.getId())
+                            .collect(Collectors.joining("&")));
+            // METRICS_AUDIT_PROXY_HOSTS depends on INLONG_GROUP_STREAM_NODE
+            if (StringUtils.isNotEmpty(groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key()))) {
+                properties.put(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
+                        groupInfo.getProperties().get(Constants.METRICS_AUDIT_PROXY_HOSTS.key()));
             }
         });
     }


[inlong] 01/02: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit aaf175ba471fdadd09f11f31cb29ac0d69e42de7
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Mon Sep 19 11:37:57 2022 +0800

    [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)
---
 .../org/apache/inlong/sort/base/Constants.java     |    4 +
 .../inlong/sort/base/metric/MetricState.java       |    8 +
 .../inlong/sort/base/metric/SinkMetricData.java    |   82 +-
 .../inlong/sort/base/util/MetricStateUtils.java    |   24 +
 inlong-sort/sort-connectors/kafka/pom.xml          |    6 +
 .../inlong/sort/kafka/FlinkKafkaConsumer.java      |  352 +++++
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  | 1350 ++++++++++++++++++++
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |   69 +-
 .../table/DynamicKafkaDeserializationSchema.java   |   61 +-
 .../sort/kafka/table/KafkaDynamicSource.java       |   82 +-
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |    2 +
 licenses/inlong-sort-connectors/LICENSE            |   12 +
 12 files changed, 1933 insertions(+), 119 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index b7bf91ef9..18ff408f2 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -37,6 +37,10 @@ public final class Constants {
 
     public static final String NUM_RECORDS_OUT = "numRecordsOut";
 
+    public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter";
+
+    public static final String NUM_RECORDS_OUT_FOR_METER = "numRecordsOutForMeter";
+
     public static final String NUM_BYTES_OUT_PER_SECOND = "numBytesOutPerSecond";
 
     public static final String NUM_RECORDS_OUT_PER_SECOND = "numRecordsOutPerSecond";
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
index 9240c0c8a..604800ccf 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -62,4 +62,12 @@ public class MetricState implements Serializable {
         }
         return 0L;
     }
+
+    @Override
+    public String toString() {
+        return "MetricState{"
+                + "subtaskIndex=" + subtaskIndex
+                + ", metrics=" + metrics.toString()
+                + '}';
+    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 67b47657e..4073ddd44 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -34,8 +34,10 @@ import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
 
 /**
@@ -50,6 +52,8 @@ public class SinkMetricData implements MetricData {
     private AuditImp auditImp;
     private Counter numRecordsOut;
     private Counter numBytesOut;
+    private Counter numRecordsOutForMeter;
+    private Counter numBytesOutForMeter;
     private Counter dirtyRecords;
     private Counter dirtyBytes;
     private Meter numRecordsOutPerSecond;
@@ -76,6 +80,43 @@ public class SinkMetricData implements MetricData {
         }
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsOutForMeter() {
+        registerMetricsForNumRecordsOutForMeter(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsOutForMeter(Counter counter) {
+        numRecordsOutForMeter = registerCounter(NUM_RECORDS_OUT_FOR_METER, counter);
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesOutForMeter() {
+        registerMetricsForNumBytesOutForMeter(new SimpleCounter());
+
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesOutForMeter(Counter counter) {
+        numBytesOutForMeter = registerCounter(NUM_BYTES_OUT_FOR_METER, counter);
+    }
+
     /**
      * Default counter is {@link SimpleCounter}
      * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
@@ -114,11 +155,11 @@ public class SinkMetricData implements MetricData {
     }
 
     public void registerMetricsForNumRecordsOutPerSecond() {
-        numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOut);
+        numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOutForMeter);
     }
 
     public void registerMetricsForNumBytesOutPerSecond() {
-        numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOut);
+        numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOutForMeter);
     }
 
     public void registerMetricsForDirtyRecords() {
@@ -191,10 +232,20 @@ public class SinkMetricData implements MetricData {
         return nodeId;
     }
 
+    public Counter getNumRecordsOutForMeter() {
+        return numRecordsOutForMeter;
+    }
+
+    public Counter getNumBytesOutForMeter() {
+        return numBytesOutForMeter;
+    }
+
     public void invokeWithEstimate(Object o) {
         long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
-        getNumRecordsOut().inc();
-        getNumBytesOut().inc(size);
+        this.numRecordsOut.inc();
+        this.numBytesOut.inc(size);
+        this.numRecordsOutForMeter.inc();
+        this.numBytesOutForMeter.inc(size);
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_OUTPUT,
@@ -207,8 +258,10 @@ public class SinkMetricData implements MetricData {
     }
 
     public void invoke(long rowCount, long rowSize) {
-        getNumRecordsOut().inc(rowCount);
-        getNumBytesOut().inc(rowSize);
+        this.numRecordsOut.inc(rowCount);
+        this.numBytesOut.inc(rowSize);
+        this.numRecordsOutForMeter.inc(rowCount);
+        this.numBytesOutForMeter.inc(rowSize);
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_OUTPUT,
@@ -219,4 +272,21 @@ public class SinkMetricData implements MetricData {
                     rowSize);
         }
     }
+
+    @Override
+    public String toString() {
+        return "SinkMetricData{"
+                + "groupId='" + groupId + '\''
+                + ", streamId='" + streamId + '\''
+                + ", nodeId='" + nodeId + '\''
+                + ", numRecordsOut=" + numRecordsOut.getCount()
+                + ", numBytesOut=" + numBytesOut.getCount()
+                + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
+                + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+                + ", dirtyRecords=" + dirtyRecords.getCount()
+                + ", dirtyBytes=" + dirtyBytes.getCount()
+                + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+                + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+                + '}';
+    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
index d878381ba..416c8b719 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.base.util;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 
 import java.util.ArrayList;
@@ -29,7 +30,9 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * metric state for {@link MetricState} supporting snapshot and restore
@@ -125,4 +128,25 @@ public class MetricStateUtils {
         metricStateListState.add(metricState);
     }
 
+    /**
+     *
+     * Snapshot metric state data for {@link SinkMetricData}
+     * @param metricStateListState state data list
+     * @param sinkMetricData {@link SinkMetricData} A collection class for handling metrics
+     * @param subtaskIndex subtask index
+     * @throws Exception throw exception when add metric state
+     */
+    public static void snapshotMetricStateForSinkMetricData(ListState<MetricState> metricStateListState,
+            SinkMetricData sinkMetricData, Integer subtaskIndex)
+            throws Exception {
+        log.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, subtaskIndex:{}",
+                metricStateListState, sinkMetricData, subtaskIndex);
+        metricStateListState.clear();
+        Map<String, Long> metricDataMap = new HashMap<>();
+        metricDataMap.put(NUM_RECORDS_OUT, sinkMetricData.getNumRecordsOut().getCount());
+        metricDataMap.put(NUM_BYTES_OUT, sinkMetricData.getNumBytesOut().getCount());
+        MetricState metricState = new MetricState(subtaskIndex, metricDataMap);
+        metricStateListState.add(metricState);
+    }
+
 }
diff --git a/inlong-sort/sort-connectors/kafka/pom.xml b/inlong-sort/sort-connectors/kafka/pom.xml
index b3a844cd8..401c0dfbb 100644
--- a/inlong-sort/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-connectors/kafka/pom.xml
@@ -92,6 +92,12 @@
                                 </filter>
                             </filters>
                             <relocations>
+                                <relocation>
+                                    <pattern>org.apache.inlong.sort.base</pattern>
+                                    <shadedPattern>
+                                        org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base
+                                    </shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
new file mode 100644
index 000000000..924944188
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
@@ -0,0 +1,352 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getBoolean;
+import static org.apache.flink.util.PropertiesUtil.getLong;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from Apache
+ * Kafka. The consumer can run in multiple parallel instances, each of which will pull data from one
+ * or more Kafka partitions.
+ *
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". (Note: These
+ * guarantees naturally assume that Kafka itself does not loose any data.)
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed
+ * checkpoints. The offsets committed to Kafka are only to bring the outside view of progress in
+ * sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how
+ * far the Flink Kafka consumer has consumed a topic.
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs
+ */
+@PublicEvolving
+public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Configuration key to change the polling timeout. *
+     */
+    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
+
+    /**
+     * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+     * available. If 0, returns immediately with any records that are available now.
+     */
+    public static final long DEFAULT_POLL_TIMEOUT = 100L;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * User-supplied properties for Kafka. *
+     */
+    protected final Properties properties;
+
+    /**
+     * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+     * available. If 0, returns immediately with any records that are available now
+     */
+    protected final long pollTimeout;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * @param topic The name of the topic that should be consumed.
+     * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and
+     *         Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            String topic, DeserializationSchema<T> valueDeserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(Collections.singletonList(topic), valueDeserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
+     * pairs, offsets, and topic names from Kafka.
+     *
+     * @param topic The name of the topic that should be consumed.
+     * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+     *         and Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            String topic, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(Collections.singletonList(topic), deserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * <p>This constructor allows passing multiple topics to the consumer.
+     *
+     * @param topics The Kafka topics to read from.
+     * @param deserializer The de-/serializer used to convert between Kafka's byte messages and
+     *         Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            List<String> topics, DeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
+     *
+     * @param topics The Kafka topics to read from.
+     * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+     *         and Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(topics, null, deserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple
+     * topics based on a regular expression pattern.
+     *
+     * <p>If partition discovery is enabled (by setting a non-negative value for {@link
+     * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with
+     * names matching the pattern will also be subscribed to as they are created on the fly.
+     *
+     * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe
+     *         to.
+     * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and
+     *         Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer,
+            Properties props, String inlongMetric, String auditHostAndPorts) {
+        this(
+                null,
+                subscriptionPattern,
+                new KafkaDeserializationSchemaWrapper<>(valueDeserializer),
+                props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple
+     * topics based on a regular expression pattern.
+     *
+     * <p>If partition discovery is enabled (by setting a non-negative value for {@link
+     * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with
+     * names matching the pattern will also be subscribed to as they are created on the fly.
+     *
+     * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
+     * pairs, offsets, and topic names from Kafka.
+     *
+     * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe
+     *         to.
+     * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+     *         and Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            Pattern subscriptionPattern,
+            KafkaDeserializationSchema<T> deserializer,
+            Properties props, String inlongMetric, String auditHostAndPorts) {
+        this(null, subscriptionPattern, deserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    private FlinkKafkaConsumer(
+            List<String> topics,
+            Pattern subscriptionPattern,
+            KafkaDeserializationSchema<T> deserializer,
+            Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+
+        super(
+                topics,
+                subscriptionPattern,
+                deserializer,
+                getLong(
+                        checkNotNull(props, "props"),
+                        KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+                        PARTITION_DISCOVERY_DISABLED),
+                !getBoolean(props, KEY_DISABLE_METRICS, false), inlongMetric, auditHostAndPorts);
+
+        this.properties = props;
+        setDeserializer(this.properties);
+
+        // configure the polling timeout
+        try {
+            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
+                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
+            } else {
+                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    "Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
+        }
+    }
+
+    @Override
+    protected AbstractFetcher<T, ?> createFetcher(
+            SourceContext<T> sourceContext,
+            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+            StreamingRuntimeContext runtimeContext,
+            OffsetCommitMode offsetCommitMode,
+            MetricGroup consumerMetricGroup,
+            boolean useMetrics)
+            throws Exception {
+
+        // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+        // this overwrites whatever setting the user configured in the properties
+        adjustAutoCommitConfig(properties, offsetCommitMode);
+
+        return new KafkaFetcher<>(
+                sourceContext,
+                assignedPartitionsWithInitialOffsets,
+                watermarkStrategy,
+                runtimeContext.getProcessingTimeService(),
+                runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+                runtimeContext.getUserCodeClassLoader(),
+                runtimeContext.getTaskNameWithSubtasks(),
+                deserializer,
+                properties,
+                pollTimeout,
+                runtimeContext.getMetricGroup(),
+                consumerMetricGroup,
+                useMetrics);
+    }
+
+    @Override
+    protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+            KafkaTopicsDescriptor topicsDescriptor,
+            int indexOfThisSubtask,
+            int numParallelSubtasks) {
+
+        return new KafkaPartitionDiscoverer(
+                topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
+    }
+
+    @Override
+    protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
+            Collection<KafkaTopicPartition> partitions, long timestamp) {
+
+        Map<TopicPartition, Long> partitionOffsetsRequest = new HashMap<>(partitions.size());
+        for (KafkaTopicPartition partition : partitions) {
+            partitionOffsetsRequest.put(
+                    new TopicPartition(partition.getTopic(), partition.getPartition()), timestamp);
+        }
+
+        final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
+        // use a short-lived consumer to fetch the offsets;
+        // this is ok because this is a one-time operation that happens only on startup
+        try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) {
+            for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
+                    consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
+
+                result.put(
+                        new KafkaTopicPartition(
+                                partitionToOffset.getKey().topic(),
+                                partitionToOffset.getKey().partition()),
+                        (partitionToOffset.getValue() == null)
+                                ? null
+                                : partitionToOffset.getValue().offset());
+            }
+        }
+        return result;
+    }
+
+    @Override
+    protected boolean getIsAutoCommitEnabled() {
+        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+                && PropertiesUtil.getLong(
+                properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
+                > 0;
+    }
+
+    /**
+     * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
+     *
+     * @param props The Kafka properties to register the serializer in.
+     */
+    private static void setDeserializer(Properties props) {
+        final String deSerName = ByteArrayDeserializer.class.getName();
+
+        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
+            LOG.warn(
+                    "Ignoring configured key DeSerializer ({})",
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        }
+        if (valDeSer != null && !valDeSer.equals(deSerName)) {
+            LOG.warn(
+                    "Ignoring configured value DeSerializer ({})",
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        }
+
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
+    }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 000000000..0d0ab4544
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,1350 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
+import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_FAILED_METRICS_COUNTER;
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_SUCCEEDED_METRICS_COUNTER;
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
+ * all Kafka versions.
+ *
+ * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the {@link
+ * AbstractFetcher}.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+@Internal
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
+        implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
+
+    private static final long serialVersionUID = -6272159445203409112L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
+    /**
+     * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+     */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The default interval to execute partition discovery, in milliseconds ({@code Long.MIN_VALUE},
+     * i.e. disabled by default).
+     */
+    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
+
+    /**
+     * Boolean configuration key to disable metrics tracking. *
+     */
+    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+    /**
+     * Configuration key to define the consumer's partition discovery interval, in milliseconds.
+     */
+    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS =
+            "flink.partition-discovery.interval-millis";
+
+    /**
+     * State name of the consumer's partition offset states.
+     */
+    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
+
+    // ------------------------------------------------------------------------
+    //  configuration state, set on the client relevant for all subtasks
+    // ------------------------------------------------------------------------
+
+    /**
+     * Describes whether we are discovering partitions for fixed topics or a topic pattern.
+     */
+    private final KafkaTopicsDescriptor topicsDescriptor;
+
+    /**
+     * The schema to convert between Kafka's byte messages, and Flink's objects.
+     */
+    protected final KafkaDeserializationSchema<T> deserializer;
+
+    /**
+     * The set of topic partitions that the source will read, with their initial offsets to start
+     * reading from.
+     */
+    private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
+
+    /**
+     * Optional watermark strategy that will be run per Kafka partition, to exploit per-partition
+     * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
+     * it into multiple copies.
+     */
+    private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
+
+    /**
+     * User-set flag determining whether or not to commit on checkpoints. Note: this flag does not
+     * represent the final offset commit mode.
+     */
+    private boolean enableCommitOnCheckpoints = true;
+
+    /**
+     * User-set flag to disable filtering restored partitions with current topics descriptor.
+     */
+    private boolean filterRestoredPartitionsWithCurrentTopicsDescriptor = true;
+
+    /**
+     * The offset commit mode for the consumer. The value of this can only be determined in {@link
+     * FlinkKafkaConsumerBase#open(Configuration)} since it depends on whether or not checkpointing
+     * is enabled for the job.
+     */
+    private OffsetCommitMode offsetCommitMode;
+
+    /**
+     * User configured value for discovery interval, in milliseconds.
+     */
+    private final long discoveryIntervalMillis;
+
+    /**
+     * The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}).
+     */
+    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+
+    /**
+     * Specific startup offsets; only relevant when startup mode is {@link
+     * StartupMode#SPECIFIC_OFFSETS}.
+     */
+    private Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+    /**
+     * Timestamp to determine startup offsets; only relevant when startup mode is {@link
+     * StartupMode#TIMESTAMP}.
+     */
+    private Long startupOffsetsTimestamp;
+
+    // ------------------------------------------------------------------------
+    //  runtime state (used individually by each parallel subtask)
+    // ------------------------------------------------------------------------
+
+    /**
+     * Data for pending but uncommitted offsets.
+     */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /**
+     * The fetcher implements the connections to the Kafka brokers.
+     */
+    private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+
+    /**
+     * The partition discoverer, used to find new partitions.
+     */
+    private transient volatile AbstractPartitionDiscoverer partitionDiscoverer;
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a checkpoint.
+     *
+     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a sorted map as the ordering is important when using restored state to seed the
+     * partition discoverer.
+     */
+    private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
+
+    /**
+     * Accessor for state in the operator state backend.
+     */
+    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
+
+    /**
+     * Discovery loop, executed in a separate thread.
+     */
+    private transient volatile Thread discoveryLoopThread;
+
+    /**
+     * Flag indicating whether the consumer is still running.
+     */
+    private volatile boolean running = true;
+
+    // ------------------------------------------------------------------------
+    //  internal metrics
+    // ------------------------------------------------------------------------
+
+    /**
+     * Flag indicating whether or not metrics should be exposed. If {@code true}, offset metrics
+     * (e.g. current offset, committed offset) and Kafka-shipped metrics will be registered.
+     */
+    private final boolean useMetrics;
+
+    /**
+     * Counter for successful Kafka offset commits.
+     */
+    private transient Counter successfulCommits;
+
+    /**
+     * Counter for failed Kafka offset commits.
+     */
+    private transient Counter failedCommits;
+
+    /**
+     * Callback interface that will be invoked upon async Kafka commit completion. Please be aware
+     * that default callback implementation in base class does not provide any guarantees on
+     * thread-safety. This is sufficient for now because current supported Kafka connectors
+     * guarantee no more than 1 concurrent async pending offset commit.
+     */
+    private transient KafkaCommitCallback offsetCommitCallback;
+
+    private transient ListState<MetricState> metricStateListState;
+
+    private MetricState metricState;
+
+    /**
+     * Metric for InLong
+     */
+    private String inlongMetric;
+    /**
+     * audit host and ports
+     */
+    private String inlongAudit;
+
+    private SourceMetricData sourceMetricData;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * Base constructor.
+     *
+     * @param topics fixed list of topics to subscribe to (null, if using topic pattern)
+     * @param topicPattern the topic pattern to subscribe to (null, if using fixed topics)
+     * @param deserializer The deserializer to turn raw byte messages into Java/Scala objects.
+     * @param discoveryIntervalMillis the topic / partition discovery interval, in milliseconds (0
+     *         if discovery is disabled).
+     */
+    public FlinkKafkaConsumerBase(
+            List<String> topics,
+            Pattern topicPattern,
+            KafkaDeserializationSchema<T> deserializer,
+            long discoveryIntervalMillis,
+            boolean useMetrics, String inlongMetric, String auditHostAndPorts) {
+        this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
+        this.deserializer = checkNotNull(deserializer, "valueDeserializer");
+
+        checkArgument(
+                discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED
+                        || discoveryIntervalMillis >= 0,
+                "Cannot define a negative value for the topic / partition discovery interval.");
+        this.discoveryIntervalMillis = discoveryIntervalMillis;
+
+        this.useMetrics = useMetrics;
+        this.inlongMetric = inlongMetric;
+        this.inlongAudit = auditHostAndPorts;
+    }
+
+    /**
+     * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS. This
+     * overwrites whatever setting the user configured in the properties.
+     *
+     * @param properties - Kafka configuration properties to be adjusted
+     * @param offsetCommitMode offset commit mode
+     */
+    protected static void adjustAutoCommitConfig(
+            Properties properties, OffsetCommitMode offsetCommitMode) {
+        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS
+                || offsetCommitMode == OffsetCommitMode.DISABLED) {
+            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        }
+    }
+    // ------------------------------------------------------------------------
+    //  Configuration
+    // ------------------------------------------------------------------------
+
+    /**
+     * Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign
+     * timestamps to records and generates watermarks to signal event time progress.
+     *
+     * <p>Running timestamp extractors / watermark generators directly inside the Kafka source
+     * (which you can do by using this method), per Kafka partition, allows users to let them
+     * exploit the per-partition characteristics.
+     *
+     * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+     * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+     * characteristics are usually lost that way. For example, if the timestamps are strictly
+     * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+     * DataStream, if the parallel source subtask reads more than one partition.
+     *
+     * <p>Common watermark generation patterns can be found as static methods in the {@link
+     * WatermarkStrategy} class.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+            WatermarkStrategy<T> watermarkStrategy) {
+        checkNotNull(watermarkStrategy);
+
+        try {
+            ClosureCleaner.clean(
+                    watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    "The given WatermarkStrategy is not serializable", e);
+        }
+
+        return this;
+    }
+
+    /**
+     * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated
+     * manner. The watermark extractor will run per Kafka partition, watermarks will be merged
+     * across partitions in the same way as in the Flink runtime, when streams are merged.
+     *
+     * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+     * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+     * characteristics are usually lost that way. For example, if the timestamps are strictly
+     * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+     * DataStream, if the parallel source subtask reads more than one partition.
+     *
+     * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per
+     * Kafka partition, allows users to let them exploit the per-partition characteristics.
+     *
+     * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link
+     * AssignerWithPeriodicWatermarks}, not both at the same time.
+     *
+     * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
+     * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
+     * interfaces support watermark idleness and no longer need to differentiate between "periodic"
+     * and "punctuated" watermarks.
+     *
+     * @param assigner The timestamp assigner / watermark generator to use.
+     * @return The consumer object, to allow function chaining.
+     * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
+     */
+    @Deprecated
+    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+            AssignerWithPunctuatedWatermarks<T> assigner) {
+        checkNotNull(assigner);
+
+        if (this.watermarkStrategy != null) {
+            throw new IllegalStateException("Some watermark strategy has already been set.");
+        }
+
+        try {
+            ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            final WatermarkStrategy<T> wms =
+                    new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner);
+
+            return assignTimestampsAndWatermarks(wms);
+        } catch (Exception e) {
+            throw new IllegalArgumentException("The given assigner is not serializable", e);
+        }
+    }
+
+    /**
+     * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated
+     * manner. The watermark extractor will run per Kafka partition, watermarks will be merged
+     * across partitions in the same way as in the Flink runtime, when streams are merged.
+     *
+     * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+     * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+     * characteristics are usually lost that way. For example, if the timestamps are strictly
+     * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+     * DataStream, if the parallel source subtask reads more that one partition.
+     *
+     * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per
+     * Kafka partition, allows users to let them exploit the per-partition characteristics.
+     *
+     * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link
+     * AssignerWithPeriodicWatermarks}, not both at the same time.
+     *
+     * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
+     * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
+     * interfaces support watermark idleness and no longer need to differentiate between "periodic"
+     * and "punctuated" watermarks.
+     *
+     * @param assigner The timestamp assigner / watermark generator to use.
+     * @return The consumer object, to allow function chaining.
+     * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
+     */
+    @Deprecated
+    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+            AssignerWithPeriodicWatermarks<T> assigner) {
+        checkNotNull(assigner);
+
+        if (this.watermarkStrategy != null) {
+            throw new IllegalStateException("Some watermark strategy has already been set.");
+        }
+
+        try {
+            ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            final WatermarkStrategy<T> wms =
+                    new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner);
+
+            return assignTimestampsAndWatermarks(wms);
+        } catch (Exception e) {
+            throw new IllegalArgumentException("The given assigner is not serializable", e);
+        }
+    }
+
+    /**
+     * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
+     *
+     * <p>This setting will only have effect if checkpointing is enabled for the job. If
+     * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit"
+     * (for 0.9+) property settings will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
+        this.enableCommitOnCheckpoints = commitOnCheckpoints;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading from the earliest offset for all partitions. This
+     * lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
+        this.startupMode = StartupMode.EARLIEST;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading from the latest offset for all partitions. This lets
+     * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromLatest() {
+        this.startupMode = StartupMode.LATEST;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading partitions from a specified timestamp. The specified
+     * timestamp must be before the current timestamp. This lets the consumer ignore any committed
+     * group offsets in Zookeeper / Kafka brokers.
+     *
+     * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal to
+     * the specific timestamp from Kafka. If there's no such offset, the consumer will use the
+     * latest offset to read data from kafka.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @param startupOffsetsTimestamp timestamp for the startup offsets, as milliseconds from epoch.
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
+        checkArgument(
+                startupOffsetsTimestamp >= 0,
+                "The provided value for the startup offsets timestamp is invalid.");
+
+        long currentTimestamp = System.currentTimeMillis();
+        checkArgument(
+                startupOffsetsTimestamp <= currentTimestamp,
+                "Startup time[%s] must be before current time[%s].",
+                startupOffsetsTimestamp,
+                currentTimestamp);
+
+        this.startupMode = StartupMode.TIMESTAMP;
+        this.startupOffsetsTimestamp = startupOffsetsTimestamp;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading from any committed group offsets found in Zookeeper /
+     * Kafka brokers. The "group.id" property must be set in the configuration properties. If no
+     * offset can be found for a partition, the behaviour in "auto.offset.reset" set in the
+     * configuration properties will be used for the partition.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
+        this.startupMode = StartupMode.GROUP_OFFSETS;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading partitions from specific offsets, set independently
+     * for each partition. The specified offset should be the offset of the next record that will be
+     * read from partitions. This lets the consumer ignore any committed group offsets in Zookeeper
+     * / Kafka brokers.
+     *
+     * <p>If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not
+     * subscribed by the consumer, the entry will be ignored. If the consumer subscribes to a
+     * partition that does not exist in the provided map of offsets, the consumer will fallback to
+     * the default group offset behaviour (see {@link
+     * FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
+     *
+     * <p>If the specified offset for a partition is invalid, or the behaviour for that partition is
+     * defaulted to group offsets but still no group offset could be found for it, then the
+     * "auto.offset.reset" behaviour set in the configuration properties will be used for the
+     * partition
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(
+            Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
+        return this;
+    }
+
+    /**
+     * By default, when restoring from a checkpoint / savepoint, the consumer always ignores
+     * restored partitions that are no longer associated with the current specified topics or topic
+     * pattern to subscribe to.
+     *
+     * <p>This method configures the consumer to not filter the restored partitions, therefore
+     * always attempting to consume whatever partition was present in the previous execution
+     * regardless of the specified topics to subscribe to in the current execution.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics() {
+        this.filterRestoredPartitionsWithCurrentTopicsDescriptor = false;
+        return this;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Work methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        // determine the offset commit mode
+        this.offsetCommitMode =
+                OffsetCommitModes.fromConfiguration(
+                        getIsAutoCommitEnabled(),
+                        enableCommitOnCheckpoints,
+                        ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
+
+        // create the partition discoverer
+        this.partitionDiscoverer =
+                createPartitionDiscoverer(
+                        topicsDescriptor,
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        getRuntimeContext().getNumberOfParallelSubtasks());
+        this.partitionDiscoverer.open();
+
+        subscribedPartitionsToStartOffsets = new HashMap<>();
+        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
+        if (restoredState != null) {
+            for (KafkaTopicPartition partition : allPartitions) {
+                if (!restoredState.containsKey(partition)) {
+                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+                }
+            }
+
+            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
+                    restoredState.entrySet()) {
+                // seed the partition discoverer with the union state while filtering out
+                // restored partitions that should not be subscribed by this subtask
+                if (KafkaTopicPartitionAssigner.assign(
+                        restoredStateEntry.getKey(),
+                        getRuntimeContext().getNumberOfParallelSubtasks())
+                        == getRuntimeContext().getIndexOfThisSubtask()) {
+                    subscribedPartitionsToStartOffsets.put(
+                            restoredStateEntry.getKey(), restoredStateEntry.getValue());
+                }
+            }
+
+            if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
+                subscribedPartitionsToStartOffsets
+                        .entrySet()
+                        .removeIf(
+                                entry -> {
+                                    if (!topicsDescriptor.isMatchingTopic(
+                                            entry.getKey().getTopic())) {
+                                        LOG.warn(
+                                                "{} is removed from subscribed partitions since it is no longer "
+                                                        + "associated with topics descriptor of current execution.",
+                                                entry.getKey());
+                                        return true;
+                                    }
+                                    return false;
+                                });
+            }
+
+            LOG.info(
+                    "Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
+                    getRuntimeContext().getIndexOfThisSubtask(),
+                    subscribedPartitionsToStartOffsets.size(),
+                    subscribedPartitionsToStartOffsets);
+        } else {
+            // use the partition discoverer to fetch the initial seed partitions,
+            // and set their initial offsets depending on the startup mode.
+            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
+            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
+            // determined
+            // when the partition is actually read.
+            switch (startupMode) {
+                case SPECIFIC_OFFSETS:
+                    if (specificStartupOffsets == null) {
+                        throw new IllegalStateException(
+                                "Startup mode for the consumer set to "
+                                        + StartupMode.SPECIFIC_OFFSETS
+                                        + ", but no specific offsets were specified.");
+                    }
+
+                    for (KafkaTopicPartition seedPartition : allPartitions) {
+                        Long specificOffset = specificStartupOffsets.get(seedPartition);
+                        if (specificOffset != null) {
+                            // since the specified offsets represent the next record to read, we
+                            // subtract
+                            // it by one so that the initial state of the consumer will be correct
+                            subscribedPartitionsToStartOffsets.put(
+                                    seedPartition, specificOffset - 1);
+                        } else {
+                            // default to group offset behaviour if the user-provided specific
+                            // offsets
+                            // do not contain a value for this partition
+                            subscribedPartitionsToStartOffsets.put(
+                                    seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+                        }
+                    }
+
+                    break;
+                case TIMESTAMP:
+                    if (startupOffsetsTimestamp == null) {
+                        throw new IllegalStateException(
+                                "Startup mode for the consumer set to "
+                                        + StartupMode.TIMESTAMP
+                                        + ", but no startup timestamp was specified.");
+                    }
+
+                    for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset :
+                            fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp)
+                                    .entrySet()) {
+                        subscribedPartitionsToStartOffsets.put(
+                                partitionToOffset.getKey(),
+                                (partitionToOffset.getValue() == null)
+                                        // if an offset cannot be retrieved for a partition with the
+                                        // given timestamp,
+                                        // we default to using the latest offset for the partition
+                                        ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
+                                        // since the specified offsets represent the next record to
+                                        // read, we subtract
+                                        // it by one so that the initial state of the consumer will
+                                        // be correct
+                                        : partitionToOffset.getValue() - 1);
+                    }
+
+                    break;
+                default:
+                    for (KafkaTopicPartition seedPartition : allPartitions) {
+                        subscribedPartitionsToStartOffsets.put(
+                                seedPartition, startupMode.getStateSentinel());
+                    }
+            }
+
+            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
+                switch (startupMode) {
+                    case EARLIEST:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the earliest"
+                                        + " offsets: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                subscribedPartitionsToStartOffsets.keySet());
+                        break;
+                    case LATEST:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the latest "
+                                        + "offsets: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                subscribedPartitionsToStartOffsets.keySet());
+                        break;
+                    case TIMESTAMP:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from timestamp "
+                                        + "{}: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                startupOffsetsTimestamp,
+                                subscribedPartitionsToStartOffsets.keySet());
+                        break;
+                    case SPECIFIC_OFFSETS:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the "
+                                        + "specified startup offsets {}: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                specificStartupOffsets,
+                                subscribedPartitionsToStartOffsets.keySet());
+
+                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets =
+                                new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
+                                subscribedPartitionsToStartOffsets.entrySet()) {
+                            if (subscribedPartition.getValue()
+                                    == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
+                            }
+                        }
+
+                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
+                            LOG.warn(
+                                    "Consumer subtask {} cannot find offsets for the following {} partitions in the "
+                                            + "specified startup offsets: {}"
+                                            + "; their startup offsets will be defaulted to their committed group "
+                                            + "offsets in Kafka.",
+                                    getRuntimeContext().getIndexOfThisSubtask(),
+                                    partitionsDefaultedToGroupOffsets.size(),
+                                    partitionsDefaultedToGroupOffsets);
+                        }
+                        break;
+                    case GROUP_OFFSETS:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the "
+                                        + "committed group offsets in Kafka: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                subscribedPartitionsToStartOffsets.keySet());
+                }
+            } else {
+                LOG.info(
+                        "Consumer subtask {} initially has no partitions to read from.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+
+        this.deserializer.open(
+                RuntimeContextInitializationContextAdapters.deserializationAdapter(
+                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+
+        if (StringUtils.isNotEmpty(this.inlongMetric)) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            AuditImp auditImp = null;
+            if (inlongAudit != null) {
+                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+                auditImp = AuditImp.getInstance();
+            }
+            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(),
+                    auditImp);
+            ThreadSafeCounter recordsInCounter = new ThreadSafeCounter();
+            ThreadSafeCounter bytesInCounter = new ThreadSafeCounter();
+            if (metricState != null) {
+                recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
+                bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
+            }
+            sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
+            sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
+            sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter());
+            sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter());
+            sourceMetricData.registerMetricsForNumBytesInPerSecond();
+            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+            if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
+                DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
+                        (DynamicKafkaDeserializationSchema) deserializer;
+                dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
+            }
+        }
+
+        if (subscribedPartitionsToStartOffsets == null) {
+            throw new Exception("The partitions were not set for the consumer");
+        }
+
+        // initialize commit metrics and default offset callback method
+        this.successfulCommits =
+                this.getRuntimeContext()
+                        .getMetricGroup()
+                        .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
+        this.failedCommits =
+                this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
+        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
+
+        this.offsetCommitCallback =
+                new KafkaCommitCallback() {
+                    @Override
+                    public void onSuccess() {
+                        successfulCommits.inc();
+                    }
+
+                    @Override
+                    public void onException(Throwable cause) {
+                        LOG.warn(
+                                String.format(
+                                        "Consumer subtask %d failed async Kafka commit.",
+                                        subtaskIndex),
+                                cause);
+                        failedCommits.inc();
+                    }
+                };
+
+        // mark the subtask as temporarily idle if there are no initial seed partitions;
+        // once this subtask discovers some partitions and starts collecting records, the subtask's
+        // status will automatically be triggered back to be active.
+        if (subscribedPartitionsToStartOffsets.isEmpty()) {
+            sourceContext.markAsTemporarilyIdle();
+        }
+
+        LOG.info(
+                "Consumer subtask {} creating fetcher with offsets {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                subscribedPartitionsToStartOffsets);
+        // from this point forward:
+        //   - 'snapshotState' will draw offsets from the fetcher,
+        //     instead of being built from `subscribedPartitionsToStartOffsets`
+        //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
+        //     Kafka through the fetcher, if configured to do so)
+        this.kafkaFetcher =
+                createFetcher(
+                        sourceContext,
+                        subscribedPartitionsToStartOffsets,
+                        watermarkStrategy,
+                        (StreamingRuntimeContext) getRuntimeContext(),
+                        offsetCommitMode,
+                        getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
+                        useMetrics);
+
+        if (!running) {
+            return;
+        }
+
+        // depending on whether we were restored with the current state version (1.3),
+        // remaining logic branches off into 2 paths:
+        //  1) New state - partition discovery loop executed as separate thread, with this
+        //                 thread running the main fetcher loop
+        //  2) Old state - partition discovery is disabled and only the main fetcher loop is
+        // executed
+        if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
+            kafkaFetcher.runFetchLoop();
+        } else {
+            runWithPartitionDiscovery();
+        }
+    }
+
+    private void runWithPartitionDiscovery() throws Exception {
+        final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
+        createAndStartDiscoveryLoop(discoveryLoopErrorRef);
+
+        kafkaFetcher.runFetchLoop();
+
+        // make sure that the partition discoverer is waked up so that
+        // the discoveryLoopThread exits
+        partitionDiscoverer.wakeup();
+        joinDiscoveryLoopThread();
+
+        // rethrow any fetcher errors
+        final Exception discoveryLoopError = discoveryLoopErrorRef.get();
+        if (discoveryLoopError != null) {
+            throw new RuntimeException(discoveryLoopError);
+        }
+    }
+
+    @VisibleForTesting
+    void joinDiscoveryLoopThread() throws InterruptedException {
+        if (discoveryLoopThread != null) {
+            discoveryLoopThread.join();
+        }
+    }
+
+    private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
+        discoveryLoopThread =
+                new Thread(
+                        () -> {
+                            try {
+                                // --------------------- partition discovery loop
+                                // ---------------------
+
+                                // throughout the loop, we always eagerly check if we are still
+                                // running before
+                                // performing the next operation, so that we can escape the loop as
+                                // soon as possible
+
+                                while (running) {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug(
+                                                "Consumer subtask {} is trying to discover new partitions ...",
+                                                getRuntimeContext().getIndexOfThisSubtask());
+                                    }
+
+                                    final List<KafkaTopicPartition> discoveredPartitions;
+                                    try {
+                                        discoveredPartitions =
+                                                partitionDiscoverer.discoverPartitions();
+                                    } catch (AbstractPartitionDiscoverer.WakeupException
+                                            | AbstractPartitionDiscoverer.ClosedException e) {
+                                        // the partition discoverer may have been closed or woken up
+                                        // before or during the discovery;
+                                        // this would only happen if the consumer was canceled;
+                                        // simply escape the loop
+                                        break;
+                                    }
+
+                                    // no need to add the discovered partitions if we were closed
+                                    // during the meantime
+                                    if (running && !discoveredPartitions.isEmpty()) {
+                                        kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
+                                    }
+
+                                    // do not waste any time sleeping if we're not running anymore
+                                    if (running && discoveryIntervalMillis != 0) {
+                                        try {
+                                            Thread.sleep(discoveryIntervalMillis);
+                                        } catch (InterruptedException iex) {
+                                            // may be interrupted if the consumer was canceled
+                                            // midway; simply escape the loop
+                                            break;
+                                        }
+                                    }
+                                }
+                            } catch (Exception e) {
+                                discoveryLoopErrorRef.set(e);
+                            } finally {
+                                // calling cancel will also let the fetcher loop escape
+                                // (if not running, cancel() was already called)
+                                if (running) {
+                                    cancel();
+                                }
+                            }
+                        },
+                        "Kafka Partition Discovery for "
+                                + getRuntimeContext().getTaskNameWithSubtasks());
+
+        discoveryLoopThread.start();
+    }
+
+    @Override
+    public void cancel() {
+        // set ourselves as not running;
+        // this would let the main discovery loop escape as soon as possible
+        running = false;
+
+        if (discoveryLoopThread != null) {
+
+            if (partitionDiscoverer != null) {
+                // we cannot close the discoverer here, as it is error-prone to concurrent access;
+                // only wakeup the discoverer, the discovery loop will clean itself up after it
+                // escapes
+                partitionDiscoverer.wakeup();
+            }
+
+            // the discovery loop may currently be sleeping in-between
+            // consecutive discoveries; interrupt to shutdown faster
+            discoveryLoopThread.interrupt();
+        }
+
+        // abort the fetcher, if there is one
+        if (kafkaFetcher != null) {
+            kafkaFetcher.cancel();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cancel();
+
+        joinDiscoveryLoopThread();
+
+        Exception exception = null;
+        if (partitionDiscoverer != null) {
+            try {
+                partitionDiscoverer.close();
+            } catch (Exception e) {
+                exception = e;
+            }
+        }
+
+        try {
+            super.close();
+        } catch (Exception e) {
+            exception = ExceptionUtils.firstOrSuppressed(e, exception);
+        }
+
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public final void initializeState(FunctionInitializationContext context) throws Exception {
+
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+
+        this.unionOffsetStates =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                createStateSerializer(getRuntimeContext().getExecutionConfig())));
+
+        if (this.inlongMetric != null) {
+            this.metricStateListState =
+                    stateStore.getUnionListState(
+                            new ListStateDescriptor<>(
+                                    INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+
+        if (context.isRestored()) {
+            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
+            // populate actual holder for restored state
+            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
+                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
+            }
+
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+
+            LOG.info(
+                    "Consumer subtask {} restored state: {}.",
+                    getRuntimeContext().getIndexOfThisSubtask(),
+                    restoredState);
+        } else {
+            LOG.info(
+                    "Consumer subtask {} has no restore state.",
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    @Override
+    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
+        if (!running) {
+            LOG.debug("snapshotState() called on closed source");
+        } else {
+            unionOffsetStates.clear();
+
+            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+            if (fetcher == null) {
+                // the fetcher has not yet been initialized, which means we need to return the
+                // originally restored offsets or the assigned partitions
+                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
+                        subscribedPartitionsToStartOffsets.entrySet()) {
+                    unionOffsetStates.add(
+                            Tuple2.of(
+                                    subscribedPartition.getKey(), subscribedPartition.getValue()));
+                }
+
+                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+                    // the map cannot be asynchronously updated, because only one checkpoint call
+                    // can happen
+                    // on this function at a time: either snapshotState() or
+                    // notifyCheckpointComplete()
+                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
+                }
+            } else {
+                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
+
+                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+                    // the map cannot be asynchronously updated, because only one checkpoint call
+                    // can happen
+                    // on this function at a time: either snapshotState() or
+                    // notifyCheckpointComplete()
+                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+                }
+
+                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
+                        currentOffsets.entrySet()) {
+                    unionOffsetStates.add(
+                            Tuple2.of(
+                                    kafkaTopicPartitionLongEntry.getKey(),
+                                    kafkaTopicPartitionLongEntry.getValue()));
+                }
+            }
+
+            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+                // truncate the map of pending offsets to commit, to prevent infinite growth
+                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+                    pendingOffsetsToCommit.remove(0);
+                }
+            }
+            if (sourceMetricData != null && metricStateListState != null) {
+                MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    @Override
+    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (!running) {
+            LOG.debug("notifyCheckpointComplete() called on closed source");
+            return;
+        }
+
+        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+        if (fetcher == null) {
+            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+            return;
+        }
+
+        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+            // only one commit operation must be in progress
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+            }
+
+            try {
+                final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+                if (posInMap == -1) {
+                    LOG.warn(
+                            "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+                            getRuntimeContext().getIndexOfThisSubtask(),
+                            checkpointId);
+                    return;
+                }
+
+                @SuppressWarnings("unchecked")
+                Map<KafkaTopicPartition, Long> offsets =
+                        (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
+
+                // remove older checkpoints in map
+                for (int i = 0; i < posInMap; i++) {
+                    pendingOffsetsToCommit.remove(0);
+                }
+
+                if (offsets == null || offsets.size() == 0) {
+                    LOG.debug(
+                            "Consumer subtask {} has empty checkpoint state.",
+                            getRuntimeContext().getIndexOfThisSubtask());
+                    return;
+                }
+
+                fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
+            } catch (Exception e) {
+                if (running) {
+                    throw e;
+                }
+                // else ignore exception if we are no longer running
+            }
+        }
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) {
+    }
+
+    // ------------------------------------------------------------------------
+    //  Kafka Consumer specific methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the data, and
+     * emits it into the data streams.
+     *
+     * @param sourceContext The source context to emit data to.
+     * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should
+     *         handle, with their start offsets.
+     * @param watermarkStrategy Optional, a serialized WatermarkStrategy.
+     * @param runtimeContext The task's runtime context.
+     * @return The instantiated fetcher
+     * @throws Exception The method should forward exceptions
+     */
+    protected abstract AbstractFetcher<T, ?> createFetcher(
+            SourceContext<T> sourceContext,
+            Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
+            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+            StreamingRuntimeContext runtimeContext,
+            OffsetCommitMode offsetCommitMode,
+            MetricGroup kafkaMetricGroup,
+            boolean useMetrics)
+            throws Exception;
+
+    /**
+     * Creates the partition discoverer that is used to find new partitions for this subtask.
+     *
+     * @param topicsDescriptor Descriptor that describes whether we are discovering partitions for
+     *         fixed topics or a topic pattern.
+     * @param indexOfThisSubtask The index of this consumer subtask.
+     * @param numParallelSubtasks The total number of parallel consumer subtasks.
+     * @return The instantiated partition discoverer
+     */
+    protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
+            KafkaTopicsDescriptor topicsDescriptor,
+            int indexOfThisSubtask,
+            int numParallelSubtasks);
+
+    protected abstract boolean getIsAutoCommitEnabled();
+
+    protected abstract Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
+            Collection<KafkaTopicPartition> partitions, long timestamp);
+
+    // ------------------------------------------------------------------------
+    //  ResultTypeQueryable methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Test utilities
+    // ------------------------------------------------------------------------
+
+    @VisibleForTesting
+    Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
+        return subscribedPartitionsToStartOffsets;
+    }
+
+    @VisibleForTesting
+    TreeMap<KafkaTopicPartition, Long> getRestoredState() {
+        return restoredState;
+    }
+
+    @VisibleForTesting
+    OffsetCommitMode getOffsetCommitMode() {
+        return offsetCommitMode;
+    }
+
+    @VisibleForTesting
+    LinkedMap getPendingOffsetsToCommit() {
+        return pendingOffsetsToCommit;
+    }
+
+    @VisibleForTesting
+    public boolean getEnableCommitOnCheckpoints() {
+        return enableCommitOnCheckpoints;
+    }
+
+    /**
+     * Creates state serializer for kafka topic partition to offset tuple. Using of the explicit
+     * state serializer with KryoSerializer is needed because otherwise users cannot use
+     * 'disableGenericTypes' properties with KafkaConsumer.
+     */
+    @VisibleForTesting
+    static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateSerializer(
+            ExecutionConfig executionConfig) {
+        // explicit serializer will keep the compatibility with GenericTypeInformation and allow to
+        // disableGenericTypes for users
+        TypeSerializer<?>[] fieldSerializers =
+                new TypeSerializer<?>[]{
+                        new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
+                        LongSerializer.INSTANCE
+                };
+        @SuppressWarnings("unchecked")
+        Class<Tuple2<KafkaTopicPartition, Long>> tupleClass =
+                (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
+        return new TupleSerializer<>(tupleClass, fieldSerializers);
+    }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index b2efd2c3e..3f0902c0c 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -57,9 +58,10 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -78,7 +80,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -93,9 +94,13 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
@@ -256,6 +261,10 @@ public class FlinkKafkaProducer<IN>
     private SinkMetricData metricData;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
+
+    private transient ListState<MetricState> metricStateListState;
+
+    private MetricState metricState;
     /**
      * State for nextTransactionalIdHint.
      */
@@ -910,27 +919,27 @@ public class FlinkKafkaProducer<IN>
             inlongGroupId = inlongMetricArray[0];
             inlongStreamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup());
+            metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(),
+                    auditHostAndPorts);
             metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
             metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
             metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+            metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
+            metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOutPerSecond();
             metricData.registerMetricsForNumRecordsOutPerSecond();
         }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
+        if (metricState != null && metricData != null) {
+            metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT));
+            metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT));
         }
-
         super.open(configuration);
     }
 
     private void sendOutMetrics(Long rowSize, Long dataSize) {
         if (metricData != null) {
-            metricData.getNumRecordsOut().inc(rowSize);
-            metricData.getNumBytesOut().inc(dataSize);
+            metricData.invoke(rowSize, dataSize);
         }
     }
 
@@ -941,23 +950,6 @@ public class FlinkKafkaProducer<IN>
         }
     }
 
-    private void outputMetricForAudit(ProducerRecord<byte[], byte[]> record) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    inlongGroupId,
-                    inlongStreamId,
-                    System.currentTimeMillis(),
-                    1,
-                    record.value().length);
-        }
-    }
-
-    private void resetMetricSize() {
-        dataSize = 0L;
-        rowSize = 0L;
-    }
-
     // ------------------- Logic for handling checkpoint flushing -------------------------- //
 
     @Override
@@ -965,7 +957,6 @@ public class FlinkKafkaProducer<IN>
             FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
             throws FlinkKafkaException {
         checkErroneous();
-        resetMetricSize();
 
         ProducerRecord<byte[], byte[]> record;
         if (keyedSchema != null) {
@@ -1029,10 +1020,7 @@ public class FlinkKafkaProducer<IN>
                             + "is a bug.");
         }
 
-        rowSize++;
-        dataSize = dataSize + record.value().length;
-        sendOutMetrics(rowSize, dataSize);
-        outputMetricForAudit(record);
+        sendOutMetrics(1L, (long) record.value().length);
 
         pendingRecords.incrementAndGet();
         transaction.producer.send(record, callback);
@@ -1247,6 +1235,10 @@ public class FlinkKafkaProducer<IN>
                             getRuntimeContext().getNumberOfParallelSubtasks(),
                             nextFreeTransactionalId));
         }
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override
@@ -1260,6 +1252,14 @@ public class FlinkKafkaProducer<IN>
             semantic = FlinkKafkaProducer.Semantic.NONE;
         }
 
+        if (this.inlongMetric != null) {
+            this.metricStateListState =
+                    context.getOperatorStateStore().getUnionListState(
+                            new ListStateDescriptor<>(
+                                    INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+
         nextTransactionalIdHintState =
                 context.getOperatorStateStore()
                         .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
@@ -1313,6 +1313,11 @@ public class FlinkKafkaProducer<IN>
             }
         }
 
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+
         super.initializeState(context);
     }
 
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 17e92abda..c6b5c11a9 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -28,23 +28,18 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * deserialization schema for {@link KafkaDynamicSource}.
  */
-class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
 
     private static final long serialVersionUID = 1L;
 
@@ -63,18 +58,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
 
     private final boolean upsertMode;
 
-    private final String inlongMetric;
-
     private SourceMetricData metricData;
 
-    private String inlongGroupId;
-
-    private String auditHostAndPorts;
-
-    private String inlongStreamId;
-
-    private transient AuditImp auditImp;
-
     DynamicKafkaDeserializationSchema(
             int physicalArity,
             @Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -84,9 +69,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
             boolean hasMetadata,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
-            boolean upsertMode,
-            String inlongMetric,
-            String auditHostAndPorts) {
+            boolean upsertMode) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -105,9 +88,10 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
                         upsertMode);
         this.producedTypeInfo = producedTypeInfo;
         this.upsertMode = upsertMode;
-        this.inlongMetric = inlongMetric;
-        this.auditHostAndPorts = auditHostAndPorts;
+    }
 
+    public void setMetricData(SourceMetricData metricData) {
+        this.metricData = metricData;
     }
 
     @Override
@@ -116,21 +100,6 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
             keyDeserialization.open(context);
         }
         valueDeserialization.open(context);
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumRecordsInPerSecond();
-        }
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
     }
 
     @Override
@@ -178,26 +147,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
     }
 
     private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
-        outputMetricForFlink(record);
-        outputMetricForAudit(record);
-    }
-
-    private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_INPUT,
-                    inlongGroupId,
-                    inlongStreamId,
-                    System.currentTimeMillis(),
-                    1,
-                    record.value().length);
-        }
-    }
-
-    private void outputMetricForFlink(ConsumerRecord<byte[], byte[]> record) {
         if (metricData != null) {
-            metricData.getNumBytesIn().inc(record.value().length);
-            metricData.getNumRecordsIn().inc(1);
+            metricData.outputMetrics(1, record.value().length);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index f3580a8f1..af784aad4 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -41,13 +40,12 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
-
+import org.apache.inlong.sort.kafka.FlinkKafkaConsumer;
 import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Header;
 
 import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -74,14 +72,21 @@ public class KafkaDynamicSource
     // Mutable attributes
     // --------------------------------------------------------------------------------------------
 
-    /** Data type that describes the final output of the source. */
+    /**
+     * Data type that describes the final output of the source.
+     */
     protected DataType producedDataType;
 
-    /** Metadata that is appended at the end of a physical source row. */
+    /**
+     * Metadata that is appended at the end of a physical source row.
+     */
     protected List<String> metadataKeys;
 
-    /** Watermark strategy that is used to generate per-partition watermark. */
-    protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+    /**
+     * Watermark strategy that is used to generate per-partition watermark.
+     */
+    protected @Nullable
+    WatermarkStrategy<RowData> watermarkStrategy;
 
     // --------------------------------------------------------------------------------------------
     // Format attributes
@@ -89,35 +94,55 @@ public class KafkaDynamicSource
 
     private static final String VALUE_METADATA_PREFIX = "value.";
 
-    /** Data type to configure the formats. */
+    /**
+     * Data type to configure the formats.
+     */
     protected final DataType physicalDataType;
 
-    /** Optional format for decoding keys from Kafka. */
-    protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
+    /**
+     * Optional format for decoding keys from Kafka.
+     */
+    protected final @Nullable
+    DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
 
-    /** Format for decoding values from Kafka. */
+    /**
+     * Format for decoding values from Kafka.
+     */
     protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
 
-    /** Indices that determine the key fields and the target position in the produced row. */
+    /**
+     * Indices that determine the key fields and the target position in the produced row.
+     */
     protected final int[] keyProjection;
 
-    /** Indices that determine the value fields and the target position in the produced row. */
+    /**
+     * Indices that determine the value fields and the target position in the produced row.
+     */
     protected final int[] valueProjection;
 
-    /** Prefix that needs to be removed from fields when constructing the physical data type. */
-    protected final @Nullable String keyPrefix;
+    /**
+     * Prefix that needs to be removed from fields when constructing the physical data type.
+     */
+    protected final @Nullable
+    String keyPrefix;
 
     // --------------------------------------------------------------------------------------------
     // Kafka-specific attributes
     // --------------------------------------------------------------------------------------------
 
-    /** The Kafka topics to consume. */
+    /**
+     * The Kafka topics to consume.
+     */
     protected final List<String> topics;
 
-    /** The Kafka topic pattern to consume. */
+    /**
+     * The Kafka topic pattern to consume.
+     */
     protected final Pattern topicPattern;
 
-    /** Properties for the Kafka consumer. */
+    /**
+     * Properties for the Kafka consumer.
+     */
     protected final Properties properties;
 
     /**
@@ -137,7 +162,9 @@ public class KafkaDynamicSource
      */
     protected final long startupTimestampMillis;
 
-    /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
+    /**
+     * Flag to determine source mode. In upsert mode, it will keep the tombstone message. *
+     */
     protected final boolean upsertMode;
 
     protected final String inlongMetric;
@@ -214,7 +241,7 @@ public class KafkaDynamicSource
 
         final FlinkKafkaConsumer<RowData> kafkaConsumer =
                 createKafkaConsumer(keyDeserialization, valueDeserialization,
-                    producedTypeInfo, inlongMetric, auditHostAndPorts);
+                        producedTypeInfo, inlongMetric, auditHostAndPorts);
 
         return SourceFunctionProvider.of(kafkaConsumer, false);
     }
@@ -350,8 +377,8 @@ public class KafkaDynamicSource
             DeserializationSchema<RowData> keyDeserialization,
             DeserializationSchema<RowData> valueDeserialization,
             TypeInformation<RowData> producedTypeInfo,
-        String inlongMetric,
-        String auditHostAndPorts) {
+            String inlongMetric,
+            String auditHostAndPorts) {
 
         final MetadataConverter[] metadataConverters =
                 metadataKeys.stream()
@@ -390,13 +417,15 @@ public class KafkaDynamicSource
                         hasMetadata,
                         metadataConverters,
                         producedTypeInfo,
-                        upsertMode, inlongMetric, auditHostAndPorts);
+                        upsertMode);
 
         final FlinkKafkaConsumer<RowData> kafkaConsumer;
         if (topics != null) {
-            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
+            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties, inlongMetric,
+                    auditHostAndPorts);
         } else {
-            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
+            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties, inlongMetric,
+                    auditHostAndPorts);
         }
 
         switch (startupMode) {
@@ -425,7 +454,8 @@ public class KafkaDynamicSource
         return kafkaConsumer;
     }
 
-    private @Nullable DeserializationSchema<RowData> createDeserialization(
+    private @Nullable
+    DeserializationSchema<RowData> createDeserialization(
             DynamicTableSource.Context context,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
             int[] projection,
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index a7eebdbcd..e7084d2fa 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -429,6 +429,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
             sourceMetricData.registerMetricsForNumRecordsIn();
             sourceMetricData.registerMetricsForNumBytesIn();
+            sourceMetricData.registerMetricsForNumBytesInForMeter();
+            sourceMetricData.registerMetricsForNumRecordsInForMeter();
             sourceMetricData.registerMetricsForNumBytesInPerSecond();
             sourceMetricData.registerMetricsForNumRecordsInPerSecond();
         }
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 980c471b7..5dce25de4 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -570,6 +570,18 @@
   Source  : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that the software have been modified.)
   License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
 
+ 1.3.11 inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+  Source  : org.apache.flink:flink-connector-kafka_2.11:1.13.5 (Please note that the software have been modified.)
+  License : https://github.com/apache/flink/blob/master/LICENSE
+
 
 =======================================================================
 Apache InLong Subcomponents: