You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/10 12:07:55 UTC

[inlong] branch master updated: [INLONG-5464][Sort] Optimize the implementation of the sort-connector-base metric common abstract (#5465)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 576e469f7 [INLONG-5464][Sort] Optimize the implementation of the sort-connector-base metric common abstract (#5465)
576e469f7 is described below

commit 576e469f7c9765a5666e1e3ae58de292945997ed
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Wed Aug 10 20:07:51 2022 +0800

    [INLONG-5464][Sort] Optimize the implementation of the sort-connector-base metric common abstract (#5465)
---
 .../{connector-base => base}/pom.xml               |   4 +-
 .../org/apache/inlong/sort/base/Constants.java     |  17 +-
 .../apache/inlong/sort/base/metric/MetricData.java |  97 +++++++++++
 .../inlong/sort/base/metric/SinkMetricData.java    | 170 ++++++++++++++++++
 .../inlong/sort/base/metric/SourceMetricData.java  | 131 ++++++++++++++
 .../inlong/sort/base/metric/ThreadSafeCounter.java |   0
 .../sort/base/metric/ThreadSafeCounterTest.java    |   0
 .../inlong/sort/base/metric/SinkMetricData.java    | 192 ---------------------
 .../inlong/sort/base/metric/SourceMetricData.java  | 138 ---------------
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  |  27 +--
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  27 +--
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |  53 ++----
 .../table/DynamicKafkaDeserializationSchema.java   |  51 +++---
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |  35 ++--
 .../sort/cdc/oracle/DebeziumSourceFunction.java    |  35 ++--
 inlong-sort/sort-connectors/pom.xml                |   2 +-
 .../DebeziumSourceFunction.java                    |  17 +-
 .../table/DynamicPulsarDeserializationSchema.java  |  65 +++----
 18 files changed, 527 insertions(+), 534 deletions(-)

diff --git a/inlong-sort/sort-connectors/connector-base/pom.xml b/inlong-sort/sort-connectors/base/pom.xml
similarity index 91%
rename from inlong-sort/sort-connectors/connector-base/pom.xml
rename to inlong-sort/sort-connectors/base/pom.xml
index e26db29ff..7106980d1 100644
--- a/inlong-sort/sort-connectors/connector-base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -16,8 +16,8 @@
   ~ limitations under the License.
   -->
 
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xmlns="http://maven.apache.org/POM/4.0.0"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>sort-connectors</artifactId>
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
similarity index 85%
rename from inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
rename to inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index e5a755aa6..54c911e77 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -48,7 +48,22 @@ public final class Constants {
     public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
 
     public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond";
-
+    /**
+     * Time span in seconds
+     */
+    public static final Integer TIME_SPAN_IN_SECONDS = 60;
+    /**
+     * Stream id used in inlong metric
+     */
+    public static final String STREAM_ID = "streamId";
+    /**
+     * Group id used in inlong metric
+     */
+    public static final String GROUP_ID = "groupId";
+    /**
+     * Node id used in inlong metric
+     */
+    public static final String NODE_ID = "nodeId";
     /**
      * It is used for inlong.metric
      */
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
new file mode 100644
index 000000000..681318ff7
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.NODE_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
+import static org.apache.inlong.sort.base.Constants.TIME_SPAN_IN_SECONDS;
+
+/**
+ * This class is the top-level interface of metric data, it defines common metric data methods.
+ */
+public interface MetricData {
+
+    /**
+     * Get metric group
+     *
+     * @return The MetricGroup
+     */
+    MetricGroup getMetricGroup();
+
+    /**
+     * Get group id
+     *
+     * @return The group id defined in inlong
+     */
+    String getGroupId();
+
+    /**
+     * Get stream id
+     *
+     * @return The stream id defined in inlong
+     */
+    String getStreamId();
+
+    /**
+     * Get node id
+     *
+     * @return The node id defined in inlong
+     */
+    String getNodeId();
+
+    /**
+     * Register a counter metric
+     *
+     * @param metricName The metric name
+     * @param counter The counter of metric
+     * @return Counter of registered
+     */
+    default Counter registerCounter(String metricName, Counter counter) {
+        return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
+                .addGroup(NODE_ID, getNodeId()).counter(metricName, counter);
+    }
+
+    /**
+     * Register a counter metric
+     *
+     * @param metricName The metric name
+     * @return Counter of registered
+     */
+    default Counter registerCounter(String metricName) {
+        return registerCounter(metricName, new SimpleCounter());
+    }
+
+    /**
+     * Register a meter metric
+     *
+     * @param metricName The metric name
+     * @return Meter of registered
+     */
+    default Meter registerMeter(String metricName, Counter counter) {
+        return getMetricGroup().addGroup(GROUP_ID, getGroupId()).addGroup(STREAM_ID, getStreamId())
+                .addGroup(NODE_ID, getNodeId()).meter(metricName, new MeterView(counter, TIME_SPAN_IN_SECONDS));
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
new file mode 100644
index 000000000..f68ad9906
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -0,0 +1,170 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+
+/**
+ * A collection class for handling metrics
+ */
+public class SinkMetricData implements MetricData {
+
+    private final MetricGroup metricGroup;
+    private final String groupId;
+    private final String streamId;
+    private final String nodeId;
+    private Counter numRecordsOut;
+    private Counter numBytesOut;
+    private Counter dirtyRecords;
+    private Counter dirtyBytes;
+    private Meter numRecordsOutPerSecond;
+    private Meter numBytesOutPerSecond;
+
+    public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+        this.groupId = groupId;
+        this.streamId = streamId;
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsOut() {
+        registerMetricsForNumRecordsOut(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsOut(Counter counter) {
+        numRecordsOut = registerCounter(NUM_RECORDS_OUT, counter);
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesOut() {
+        registerMetricsForNumBytesOut(new SimpleCounter());
+
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesOut(Counter counter) {
+        numBytesOut = registerCounter(NUM_BYTES_OUT, counter);
+    }
+
+    public void registerMetricsForNumRecordsOutPerSecond() {
+        numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOut);
+    }
+
+    public void registerMetricsForNumBytesOutPerSecond() {
+        numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOut);
+    }
+
+    public void registerMetricsForDirtyRecords() {
+        registerMetricsForDirtyRecords(new SimpleCounter());
+    }
+
+    public void registerMetricsForDirtyRecords(Counter counter) {
+        dirtyRecords = registerCounter(DIRTY_RECORDS, counter);
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForDirtyBytes() {
+        registerMetricsForDirtyBytes(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForDirtyBytes(Counter counter) {
+        dirtyBytes = registerCounter(DIRTY_BYTES, counter);
+    }
+
+    public Counter getNumRecordsOut() {
+        return numRecordsOut;
+    }
+
+    public Counter getNumBytesOut() {
+        return numBytesOut;
+    }
+
+    public Counter getDirtyRecords() {
+        return dirtyRecords;
+    }
+
+    public Counter getDirtyBytes() {
+        return dirtyBytes;
+    }
+
+    public Meter getNumRecordsOutPerSecond() {
+        return numRecordsOutPerSecond;
+    }
+
+    public Meter getNumBytesOutPerSecond() {
+        return numBytesOutPerSecond;
+    }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
+
+    @Override
+    public String getGroupId() {
+        return groupId;
+    }
+
+    @Override
+    public String getStreamId() {
+        return streamId;
+    }
+
+    @Override
+    public String getNodeId() {
+        return nodeId;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
new file mode 100644
index 000000000..e548784dc
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -0,0 +1,131 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+
+/**
+ * A collection class for handling metrics
+ */
+public class SourceMetricData implements MetricData {
+
+    private final MetricGroup metricGroup;
+    private final String groupId;
+    private final String streamId;
+    private final String nodeId;
+    private Counter numRecordsIn;
+    private Counter numBytesIn;
+    private Meter numRecordsInPerSecond;
+    private Meter numBytesInPerSecond;
+
+    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
+        this.groupId = groupId;
+        this.streamId = streamId;
+        this.nodeId = nodeId;
+        this.metricGroup = metricGroup;
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsIn() {
+        registerMetricsForNumRecordsIn(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsIn(Counter counter) {
+        numRecordsIn = registerCounter(NUM_RECORDS_IN, counter);
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesIn() {
+        registerMetricsForNumBytesIn(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesIn(Counter counter) {
+        numBytesIn = registerCounter(NUM_BYTES_IN, counter);
+    }
+
+    public void registerMetricsForNumRecordsInPerSecond() {
+        numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsIn);
+    }
+
+    public void registerMetricsForNumBytesInPerSecond() {
+        numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesIn);
+    }
+
+    public Counter getNumRecordsIn() {
+        return numRecordsIn;
+    }
+
+    public Counter getNumBytesIn() {
+        return numBytesIn;
+    }
+
+    public Meter getNumRecordsInPerSecond() {
+        return numRecordsInPerSecond;
+    }
+
+    public Meter getNumBytesInPerSecond() {
+        return numBytesInPerSecond;
+    }
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
+
+    @Override
+    public String getGroupId() {
+        return groupId;
+    }
+
+    @Override
+    public String getStreamId() {
+        return streamId;
+    }
+
+    @Override
+    public String getNodeId() {
+        return nodeId;
+    }
+}
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java
similarity index 100%
rename from inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java
rename to inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/ThreadSafeCounter.java
diff --git a/inlong-sort/sort-connectors/connector-base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java
similarity index 100%
rename from inlong-sort/sort-connectors/connector-base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java
rename to inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/metric/ThreadSafeCounterTest.java
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
deleted file mode 100644
index 27c3a2194..000000000
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.inlong.sort.base.metric;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-
-/**
- * A collection class for handling metrics
- */
-public class SinkMetricData {
-
-    private final MetricGroup metricGroup;
-
-    private Counter numRecordsOut;
-    private Counter numBytesOut;
-    private Counter dirtyRecords;
-    private Counter dirtyBytes;
-    private Meter numRecordsOutPerSecond;
-    private Meter numBytesOutPerSecond;
-    private static Integer TIME_SPAN_IN_SECONDS = 60;
-    private static String STREAM_ID = "streamId";
-    private static String GROUP_ID = "groupId";
-    private static String NODE_ID = "nodeId";
-
-    public SinkMetricData(MetricGroup metricGroup) {
-        this.metricGroup = metricGroup;
-    }
-
-    /**
-     * Default counter is {@link SimpleCounter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
-        registerMetricsForNumRecordsOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
-    }
-
-    /**
-     * User can use custom counter that extends from {@link Counter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName,
-            Counter counter) {
-        numRecordsOut =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName, counter);
-    }
-
-    /**
-     * Default counter is {@link SimpleCounter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
-        registerMetricsForNumBytesOut(groupId, streamId, nodeId, metricName, new SimpleCounter());
-    }
-
-    /**
-     * User can use custom counter that extends from {@link Counter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName,
-            Counter counter) {
-        numBytesOut =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName, counter);
-    }
-
-    public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
-            String metricName) {
-        numRecordsOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
-                        nodeId)
-                .meter(metricName, new MeterView(this.numRecordsOut, TIME_SPAN_IN_SECONDS));
-    }
-
-    public void registerMetricsForNumBytesOutPerSecond(String groupId, String streamId, String nodeId,
-            String metricName) {
-        numBytesOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
-                .addGroup(NODE_ID, nodeId)
-                .meter(metricName, new MeterView(this.numBytesOut, TIME_SPAN_IN_SECONDS));
-    }
-
-    public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
-            String metricName) {
-        registerMetricsForDirtyRecords(groupId, streamId, nodeId, metricName, new SimpleCounter());
-    }
-
-    public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
-            String metricName, Counter counter) {
-        dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                .counter(metricName, counter);
-    }
-
-    /**
-     * Default counter is {@link SimpleCounter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
-            String metricName) {
-        registerMetricsForDirtyBytes(groupId, streamId, nodeId, metricName, new SimpleCounter());
-    }
-
-    /**
-     * User can use custom counter that extends from {@link Counter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
-            String metricName, Counter counter) {
-        dirtyBytes =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName, counter);
-    }
-
-    public Counter getNumRecordsOut() {
-        return numRecordsOut;
-    }
-
-    public Counter getNumBytesOut() {
-        return numBytesOut;
-    }
-
-    public Counter getDirtyRecords() {
-        return dirtyRecords;
-    }
-
-    public Counter getDirtyBytes() {
-        return dirtyBytes;
-    }
-
-    public Meter getNumRecordsOutPerSecond() {
-        return numRecordsOutPerSecond;
-    }
-
-    public Meter getNumBytesOutPerSecond() {
-        return numBytesOutPerSecond;
-    }
-
-}
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
deleted file mode 100644
index a962e2938..000000000
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.inlong.sort.base.metric;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-
-/**
- * A collection class for handling metrics
- */
-public class SourceMetricData {
-
-    private static Integer TIME_SPAN_IN_SECONDS = 60;
-    private static String STREAM_ID = "streamId";
-    private static String GROUP_ID = "groupId";
-    private static String NODE_ID = "nodeId";
-    private final MetricGroup metricGroup;
-    private Counter numRecordsIn;
-    private Counter numBytesIn;
-    private Meter numRecordsInPerSecond;
-    private Meter numBytesInPerSecond;
-
-    public SourceMetricData(MetricGroup metricGroup) {
-        this.metricGroup = metricGroup;
-    }
-
-    /**
-     * Default counter is {@link SimpleCounter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName) {
-        registerMetricsForNumRecordsIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
-    }
-
-    /**
-     * User can use custom counter that extends from {@link Counter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumRecordsIn(String groupId, String streamId, String nodeId, String metricName,
-            Counter counter) {
-        numRecordsIn =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName, counter);
-    }
-
-    /**
-     * Default counter is {@link SimpleCounter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName) {
-        registerMetricsForNumBytesIn(groupId, streamId, nodeId, metricName, new SimpleCounter());
-    }
-
-    /**
-     * User can use custom counter that extends from {@link Counter}
-     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
-     * prometheus
-     *
-     * @param groupId inlong groupId
-     * @param streamId inlong streamId
-     * @param nodeId inlong nodeId
-     * @param metricName metric name
-     */
-    public void registerMetricsForNumBytesIn(String groupId, String streamId, String nodeId, String metricName,
-            Counter counter) {
-        numBytesIn =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName, counter);
-    }
-
-    public void registerMetricsForNumRecordsInPerSecond(String groupId, String streamId, String nodeId,
-            String metricName) {
-        numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
-                        nodeId)
-                .meter(metricName, new MeterView(this.numRecordsIn, TIME_SPAN_IN_SECONDS));
-    }
-
-    public void registerMetricsForNumBytesInPerSecond(String groupId, String streamId, String nodeId,
-            String metricName) {
-        numBytesInPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
-                .addGroup(NODE_ID, nodeId)
-                .meter(metricName, new MeterView(this.numBytesIn, TIME_SPAN_IN_SECONDS));
-    }
-
-    public Counter getNumRecordsIn() {
-        return numRecordsIn;
-    }
-
-    public Counter getNumBytesIn() {
-        return numBytesIn;
-    }
-
-    public Meter getNumRecordsInPerSecond() {
-        return numRecordsInPerSecond;
-    }
-
-    public Meter getNumBytesInPerSecond() {
-        return numBytesInPerSecond;
-    }
-
-}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 9ed713119..e4760866c 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -52,13 +52,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
-import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
-
 /**
  * The sink function for HBase.
  *
@@ -127,24 +120,18 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T>
         org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
         try {
             this.runtimeContext = getRuntimeContext();
-            sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
             if (inLongMetric != null && !inLongMetric.isEmpty()) {
                 String[] inLongMetricArray = inLongMetric.split(Constants.DELIMITER);
                 String groupId = inLongMetricArray[0];
                 String streamId = inLongMetricArray[1];
                 String nodeId = inLongMetricArray[2];
-                sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, DIRTY_BYTES,
-                        new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, DIRTY_RECORDS,
-                        new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, NUM_BYTES_OUT,
-                        new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, NUM_RECORDS_OUT,
-                        new ThreadSafeCounter());
-                sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId,
-                        NUM_BYTES_OUT_PER_SECOND);
-                sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
-                        NUM_RECORDS_OUT_PER_SECOND);
+                sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
+                sinkMetricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
+                sinkMetricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
+                sinkMetricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
+                sinkMetricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+                sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+                sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
             }
             this.mutationConverter.open();
             this.numPendingRequests = new AtomicLong(0);
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 55474def8..98f0f0cf6 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -51,17 +51,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-
 import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
-import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
-import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
 
 /**
  * A JDBC outputFormat that supports batching records before writing records to database.
@@ -137,24 +130,18 @@ public class JdbcBatchingOutputFormat<
     public void open(int taskNumber, int numTasks) throws IOException {
         super.open(taskNumber, numTasks);
         this.runtimeContext = getRuntimeContext();
-        sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {
             String[] inLongMetricArray = inLongMetric.split(DELIMITER);
             inLongGroupId = inLongMetricArray[0];
             inLongStreamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            sinkMetricData.registerMetricsForDirtyBytes(inLongGroupId, inLongStreamId,
-                    nodeId, DIRTY_BYTES);
-            sinkMetricData.registerMetricsForDirtyRecords(inLongGroupId, inLongStreamId,
-                    nodeId, DIRTY_RECORDS);
-            sinkMetricData.registerMetricsForNumBytesOut(inLongGroupId, inLongStreamId,
-                    nodeId, NUM_BYTES_OUT);
-            sinkMetricData.registerMetricsForNumRecordsOut(inLongGroupId, inLongStreamId,
-                    nodeId, NUM_RECORDS_OUT);
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
-                    NUM_BYTES_OUT_PER_SECOND);
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
-                    NUM_RECORDS_OUT_PER_SECOND);
+            sinkMetricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, runtimeContext.getMetricGroup());
+            sinkMetricData.registerMetricsForDirtyBytes();
+            sinkMetricData.registerMetricsForDirtyRecords();
+            sinkMetricData.registerMetricsForNumBytesOut();
+            sinkMetricData.registerMetricsForNumRecordsOut();
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
         if (auditHostAndPorts != null) {
             AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 2df2b0893..93e7a8b31 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -93,16 +93,9 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
-import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
-import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
 
 /**
  * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
@@ -227,19 +220,6 @@ public class FlinkKafkaProducer<IN>
      * audit host and ports
      */
     private final String auditHostAndPorts;
-    /**
-     * audit implement
-     */
-    private transient AuditImp auditImp;
-    /**
-     * inLong groupId
-     */
-    private String inLongGroupId;
-    /**
-     * inLong streamId
-     */
-    private String inLongStreamId;
-
     /**
      * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
      */
@@ -258,6 +238,18 @@ public class FlinkKafkaProducer<IN>
      */
     @Nullable
     protected transient volatile Exception asyncException;
+    /**
+     * audit implement
+     */
+    private transient AuditImp auditImp;
+    /**
+     * inLong groupId
+     */
+    private String inLongGroupId;
+    /**
+     * inLong streamId
+     */
+    private String inLongStreamId;
     /**
      * sink metric data
      */
@@ -913,25 +905,18 @@ public class FlinkKafkaProducer<IN>
                     RuntimeContextInitializationContextAdapters.serializationAdapter(
                             getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
         }
-
-        metricData = new SinkMetricData(ctx.getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {
             String[] inLongMetricArray = inLongMetric.split(DELIMITER);
             inLongGroupId = inLongMetricArray[0];
             inLongStreamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            metricData.registerMetricsForDirtyBytes(inLongGroupId, inLongStreamId, nodeId, DIRTY_BYTES,
-                    new ThreadSafeCounter());
-            metricData.registerMetricsForDirtyRecords(inLongGroupId, inLongStreamId, nodeId, DIRTY_RECORDS,
-                    new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOut(inLongGroupId, inLongStreamId, nodeId, NUM_BYTES_OUT,
-                    new ThreadSafeCounter());
-            metricData.registerMetricsForNumRecordsOut(inLongGroupId, inLongStreamId, nodeId, NUM_RECORDS_OUT,
-                    new ThreadSafeCounter());
-            metricData.registerMetricsForNumBytesOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
-                    NUM_BYTES_OUT_PER_SECOND);
-            metricData.registerMetricsForNumRecordsOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
-                    NUM_RECORDS_OUT_PER_SECOND);
+            metricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, ctx.getMetricGroup());
+            metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
+            metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
+            metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
+            metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+            metricData.registerMetricsForNumBytesOutPerSecond();
+            metricData.registerMetricsForNumRecordsOutPerSecond();
         }
 
         if (auditHostAndPorts != null) {
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index f09b16b5b..ef8660280 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -18,14 +18,6 @@
 
 package org.apache.inlong.sort.kafka.table;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
@@ -41,6 +33,14 @@ import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
 /**
  * deserialization schema for {@link KafkaDynamicSource}.
  */
@@ -48,7 +48,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
 
     private static final long serialVersionUID = 1L;
 
-    private final @Nullable DeserializationSchema<RowData> keyDeserialization;
+    private final @Nullable
+    DeserializationSchema<RowData> keyDeserialization;
 
     private final DeserializationSchema<RowData> valueDeserialization;
 
@@ -85,7 +86,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
             TypeInformation<RowData> producedTypeInfo,
             boolean upsertMode,
             String inLongMetric,
-        String auditHostAndPorts) {
+            String auditHostAndPorts) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -120,14 +121,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
             inLongGroupId = inLongMetricArray[0];
             inLongStreamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            metricData = new SourceMetricData(context.getMetricGroup());
-            metricData.registerMetricsForNumBytesIn(inLongGroupId, inLongStreamId, nodeId, "numBytesIn");
-            metricData.registerMetricsForNumBytesInPerSecond(inLongGroupId, inLongStreamId,
-                nodeId, "numBytesInPerSecond");
-            metricData.registerMetricsForNumRecordsIn(inLongGroupId, inLongStreamId,
-                nodeId, "numRecordsIn");
-            metricData.registerMetricsForNumRecordsInPerSecond(inLongGroupId, inLongStreamId,
-                nodeId, "numRecordsInPerSecond");
+            metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, context.getMetricGroup());
+            metricData.registerMetricsForNumBytesIn();
+            metricData.registerMetricsForNumBytesInPerSecond();
+            metricData.registerMetricsForNumRecordsIn();
+            metricData.registerMetricsForNumRecordsInPerSecond();
         }
         if (auditHostAndPorts != null) {
             AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
@@ -187,12 +185,12 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
     private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) {
         if (auditImp != null) {
             auditImp.add(
-                Constants.AUDIT_SORT_INPUT,
-                inLongGroupId,
-                inLongStreamId,
-                System.currentTimeMillis(),
-                1,
-                record.value().length);
+                    Constants.AUDIT_SORT_INPUT,
+                    inLongGroupId,
+                    inLongStreamId,
+                    System.currentTimeMillis(),
+                    1,
+                    record.value().length);
         }
     }
 
@@ -211,6 +209,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
     // --------------------------------------------------------------------------------------------
 
     interface MetadataConverter extends Serializable {
+
         Object read(ConsumerRecord<?, ?> record);
     }
 
@@ -311,8 +310,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
                 } else {
                     throw new DeserializationException(
                             "Invalid null value received "
-                                + "in non-upsert mode. Could not to "
-                                + "set row kind for output record.");
+                                    + "in non-upsert mode. Could not to "
+                                    + "set row kind for output record.");
                 }
             } else {
                 rowKind = physicalValueRow.getRowKind();
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 141cc5672..db41472ed 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -76,7 +76,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -112,40 +111,33 @@ import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHisto
 public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 
-    private static final long serialVersionUID = -5808108641062931623L;
-
-    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
-
     /**
      * State name of the consumer's partition offset states.
      */
     public static final String OFFSETS_STATE_NAME = "offset-states";
-
     /**
      * State name of the consumer's history records state.
      */
     public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
-
     /**
      * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
      */
     public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
     /**
      * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
      * not.
      */
     public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
-
     /**
      * The configuration value represents legacy implementation.
      */
     public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+    private static final long serialVersionUID = -5808108641062931623L;
 
     // ---------------------------------------------------------------------------------------
     // Properties
     // ---------------------------------------------------------------------------------------
-
     /**
      * The schema to convert from Debezium's messages into Flink's objects.
      */
@@ -166,21 +158,18 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
      * Data for pending but uncommitted offsets.
      */
     private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
-    /**
-     * Flag indicating whether the Debezium Engine is started.
-     */
-    private volatile boolean debeziumStarted = false;
-
     /**
      * Validator to validate the connected database satisfies the cdc connector's requirements.
      */
     private final Validator validator;
+    /**
+     * Flag indicating whether the Debezium Engine is started.
+     */
+    private volatile boolean debeziumStarted = false;
 
     // ---------------------------------------------------------------------------------------
     // State
     // ---------------------------------------------------------------------------------------
-
     /**
      * The offsets to restore to, if the consumer restores state from a checkpoint.
      *
@@ -425,13 +414,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(metricGroup);
-            metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
-            metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
-            metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
-                    Constants.NUM_BYTES_IN_PER_SECOND);
-            metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
-                    Constants.NUM_RECORDS_IN_PER_SECOND);
+            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+            metricData.registerMetricsForNumRecordsIn();
+            metricData.registerMetricsForNumBytesIn();
+            metricData.registerMetricsForNumBytesInPerSecond();
+            metricData.registerMetricsForNumRecordsInPerSecond();
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 166f7f1b7..5d7eb10ff 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -76,7 +76,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -112,40 +111,33 @@ import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHisto
 public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 
-    private static final long serialVersionUID = -5808108641062931623L;
-
-    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
-
     /**
      * State name of the consumer's partition offset states.
      */
     public static final String OFFSETS_STATE_NAME = "offset-states";
-
     /**
      * State name of the consumer's history records state.
      */
     public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
-
     /**
      * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
      */
     public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
     /**
      * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
      * not.
      */
     public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
-
     /**
      * The configuration value represents legacy implementation.
      */
     public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+    private static final long serialVersionUID = -5808108641062931623L;
 
     // ---------------------------------------------------------------------------------------
     // Properties
     // ---------------------------------------------------------------------------------------
-
     /**
      * The schema to convert from Debezium's messages into Flink's objects.
      */
@@ -166,21 +158,18 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
      * Data for pending but uncommitted offsets.
      */
     private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
-    /**
-     * Flag indicating whether the Debezium Engine is started.
-     */
-    private volatile boolean debeziumStarted = false;
-
     /**
      * Validator to validate the connected database satisfies the cdc connector's requirements.
      */
     private final Validator validator;
+    /**
+     * Flag indicating whether the Debezium Engine is started.
+     */
+    private volatile boolean debeziumStarted = false;
 
     // ---------------------------------------------------------------------------------------
     // State
     // ---------------------------------------------------------------------------------------
-
     /**
      * The offsets to restore to, if the consumer restores state from a checkpoint.
      *
@@ -425,13 +414,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(metricGroup);
-            metricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, Constants.NUM_RECORDS_IN);
-            metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, Constants.NUM_BYTES_IN);
-            metricData.registerMetricsForNumBytesInPerSecond(groupId, streamId, nodeId,
-                    Constants.NUM_BYTES_IN_PER_SECOND);
-            metricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId, nodeId,
-                    Constants.NUM_RECORDS_IN_PER_SECOND);
+            metricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+            metricData.registerMetricsForNumRecordsIn();
+            metricData.registerMetricsForNumBytesIn();
+            metricData.registerMetricsForNumBytesInPerSecond();
+            metricData.registerMetricsForNumRecordsInPerSecond();
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index fa6f26b80..fe6503227 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -35,7 +35,7 @@
     <packaging>pom</packaging>
 
     <modules>
-        <module>connector-base</module>
+        <module>base</module>
         <module>hive</module>
         <module>mysql-cdc</module>
         <module>kafka</module>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index cc02fd1e2..e53ff76dc 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -76,13 +76,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
@@ -419,13 +414,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            sourceMetricData = new SourceMetricData(metricGroup);
-            sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId, nodeId, NUM_RECORDS_IN);
-            sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, NUM_BYTES_IN);
-            sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId, streamId,
-                    nodeId, NUM_BYTES_IN_PER_SECOND);
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
-                    nodeId, NUM_RECORDS_IN_PER_SECOND);
+            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup);
+            sourceMetricData.registerMetricsForNumRecordsIn();
+            sourceMetricData.registerMetricsForNumBytesIn();
+            sourceMetricData.registerMetricsForNumBytesInPerSecond();
+            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 7f6b1fb95..ea0c995c8 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,17 +18,6 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
@@ -45,38 +34,36 @@ import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
 /**
  * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
  */
 class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
 
     private static final long serialVersionUID = 1L;
-
+    private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
+            new ThreadLocal<SimpleCollector<RowData>>() {
+                @Override
+                public SimpleCollector initialValue() {
+                    return new SimpleCollector();
+                }
+            };
     @Nullable
     private final DeserializationSchema<RowData> keyDeserialization;
-
     private final DeserializationSchema<RowData> valueDeserialization;
-
     private final boolean hasMetadata;
-
     private final OutputProjectionCollector outputCollector;
-
     private final TypeInformation<RowData> producedTypeInfo;
-
     private final boolean upsertMode;
-
     private SourceMetricData sourceMetricData;
-
     private String inlongMetric;
 
-    private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
-            new ThreadLocal<SimpleCollector<RowData>>() {
-                @Override
-                public SimpleCollector initialValue() {
-                    return new SimpleCollector();
-                }
-            };
-
     DynamicPulsarDeserializationSchema(
             int physicalArity,
             @Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -87,7 +74,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
             boolean upsertMode,
-    String inlongMetric) {
+            String inlongMetric) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -115,19 +102,15 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
         valueDeserialization.open(context);
 
         if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            sourceMetricData = new SourceMetricData(context.getMetricGroup());
             String[] inLongMetricArray = inlongMetric.split(DELIMITER);
             String groupId = inLongMetricArray[0];
             String streamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            sourceMetricData.registerMetricsForNumBytesIn(groupId,
-                streamId, nodeId, NUM_BYTES_IN);
-            sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId,
-                streamId, nodeId, NUM_BYTES_IN_PER_SECOND);
-            sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId,
-                nodeId, NUM_RECORDS_IN);
-            sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
-                nodeId, NUM_RECORDS_IN_PER_SECOND);
+            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, context.getMetricGroup());
+            sourceMetricData.registerMetricsForNumBytesIn();
+            sourceMetricData.registerMetricsForNumBytesInPerSecond();
+            sourceMetricData.registerMetricsForNumRecordsIn();
+            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
         }
 
     }
@@ -153,7 +136,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             if (sourceMetricData != null) {
                 sourceMetricData.getNumRecordsIn().inc(1L);
                 sourceMetricData.getNumBytesIn()
-                    .inc(message.getData().length);
+                        .inc(message.getData().length);
             }
             return;
         }
@@ -176,7 +159,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
             if (sourceMetricData != null) {
                 sourceMetricData.getNumRecordsIn().inc(1L);
                 sourceMetricData.getNumBytesIn()
-                    .inc(message.getData().length);
+                        .inc(message.getData().length);
             }
         }
 
@@ -196,6 +179,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
     // --------------------------------------------------------------------------------------------
 
     interface MetadataConverter extends Serializable {
+
         Object read(Message<?> message);
     }
 
@@ -219,6 +203,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
     }
 
     private static class SimpleCollector<T> implements Collector<T> {
+
         private T record;
 
         @Override
@@ -320,7 +305,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<
                 } else {
                     throw new DeserializationException(
                             "Invalid null value received in non-upsert mode. "
-                                + "Could not to set row kind for output record.");
+                                    + "Could not to set row kind for output record.");
                 }
             } else {
                 rowKind = physicalValueRow.getRowKind();