You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/08 07:59:22 UTC

[inlong] 01/02: [INLONG-5608][Sort] Reformat connector codes for reporting metrics (#5612)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit bbffb2e840b208b944ab3874e65842a49ed264aa
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Fri Sep 2 20:31:15 2022 +0800

    [INLONG-5608][Sort] Reformat connector codes for reporting metrics (#5612)
---
 .../sort/elasticsearch6/ElasticsearchSink.java     | 18 +++++-----
 .../table/Elasticsearch6DynamicSink.java           | 18 +++++-----
 .../table/Elasticsearch6DynamicSinkFactory.java    |  4 +--
 .../sort/elasticsearch7/ElasticsearchSink.java     | 18 +++++-----
 .../table/Elasticsearch7DynamicSink.java           | 18 +++++-----
 .../table/Elasticsearch7DynamicSinkFactory.java    |  4 +--
 .../sort/elasticsearch/ElasticsearchSinkBase.java  | 17 +++++-----
 .../table/RowElasticsearchSinkFunction.java        | 20 ++++++------
 .../sort/hbase/HBase2DynamicTableFactory.java      |  2 +-
 .../org/apache/inlong/sort/hive/HiveTableSink.java | 12 +++----
 .../hive/filesystem/AbstractStreamingWriter.java   | 18 +++++-----
 .../sort/hive/filesystem/CompactFileWriter.java    |  4 +--
 .../sort/hive/filesystem/StreamingFileWriter.java  |  4 +--
 .../inlong/sort/hive/filesystem/StreamingSink.java |  8 ++---
 .../sort/hive/table/HiveTableInlongFactory.java    |  4 +--
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 14 ++++----
 .../sort/iceberg/sink/IcebergStreamWriter.java     | 18 +++++-----
 .../jdbc/internal/JdbcBatchingOutputFormat.java    | 38 +++++++++++-----------
 .../jdbc/internal/TableJdbcUpsertOutputFormat.java |  8 ++---
 .../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 10 +++---
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   |  4 +--
 .../sort/jdbc/table/JdbcDynamicTableSink.java      | 14 ++++----
 .../inlong/sort/kafka/FlinkKafkaProducer.java      | 30 ++++++++---------
 .../apache/inlong/sort/kafka/KafkaDynamicSink.java | 14 ++++----
 .../table/DynamicKafkaDeserializationSchema.java   | 22 ++++++-------
 .../sort/kafka/table/KafkaDynamicSource.java       | 10 +++---
 .../sort/kafka/table/KafkaDynamicTableFactory.java | 16 ++++-----
 .../table/UpsertKafkaDynamicTableFactory.java      |  4 +--
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |  3 +-
 .../mongodb/table/MongoDBTableSourceFactory.java   |  2 +-
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |  3 +-
 .../mysql/table/MySqlTableInlongSourceFactory.java |  2 +-
 .../sort/cdc/oracle/DebeziumSourceFunction.java    |  3 +-
 .../cdc/oracle/table/OracleTableSourceFactory.java |  2 +-
 .../DebeziumSourceFunction.java                    |  3 +-
 .../cdc/postgres/table/PostgreSQLTableFactory.java |  2 +-
 .../pulsar/table/PulsarDynamicTableFactory.java    |  8 +++--
 .../table/UpsertPulsarDynamicTableFactory.java     |  2 +-
 .../sqlserver/table/DebeziumSourceFunction.java    | 16 ++++-----
 .../cdc/sqlserver/table/SqlServerTableFactory.java |  2 +-
 40 files changed, 209 insertions(+), 210 deletions(-)

diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
index c30787d7c..32848dfa6 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -70,14 +70,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
             ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
             ActionRequestFailureHandler failureHandler,
             RestClientFactory restClientFactory,
-            String inLongMetric) {
+            String inlongMetric) {
 
         super(
                 new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
                 bulkRequestsConfig,
                 elasticsearchSinkFunction,
                 failureHandler,
-                inLongMetric);
+                inlongMetric);
     }
 
     /**
@@ -95,7 +95,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
         private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
         private RestClientFactory restClientFactory = restClientBuilder -> {
         };
-        private String inLongMetric = null;
+        private String inlongMetric = null;
 
         /**
          * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
@@ -114,10 +114,10 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 
         /**
          * set InLongMetric for reporting metrics
-         * @param inLongMetric
+         * @param inlongMetric
          */
-        public void setInLongMetric(String inLongMetric) {
-            this.inLongMetric = inLongMetric;
+        public void setInLongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
         }
 
         /**
@@ -244,7 +244,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
                     elasticsearchSinkFunction,
                     failureHandler,
                     restClientFactory,
-                    inLongMetric
+                    inlongMetric
                     );
         }
 
@@ -262,7 +262,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
                     && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
                     && Objects.equals(failureHandler, builder.failureHandler)
                     && Objects.equals(restClientFactory, builder.restClientFactory)
-                    && Objects.equals(inLongMetric, builder.inLongMetric);
+                    && Objects.equals(inlongMetric, builder.inlongMetric);
         }
 
         @Override
@@ -273,7 +273,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
                     bulkRequestsConfig,
                     failureHandler,
                     restClientFactory,
-                    inLongMetric);
+                    inlongMetric);
         }
     }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index 2550edeb4..bff1ad476 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -64,7 +64,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
     private final EncodingFormat<SerializationSchema<RowData>> format;
     private final TableSchema schema;
     private final Elasticsearch6Configuration config;
-    private final String inLongMetric;
+    private final String inlongMetric;
     private final String auditHostAndPorts;
     private final ElasticSearchBuilderProvider builderProvider;
 
@@ -82,9 +82,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
             EncodingFormat<SerializationSchema<RowData>> format,
             Elasticsearch6Configuration config,
             TableSchema schema,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric, auditHostAndPorts);
+        this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts);
     }
 
     Elasticsearch6DynamicSink(
@@ -92,13 +92,13 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
             Elasticsearch6Configuration config,
             TableSchema schema,
             ElasticSearchBuilderProvider builderProvider,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this.format = format;
         this.schema = schema;
         this.config = config;
         this.builderProvider = builderProvider;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -133,7 +133,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
                             KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
                             RoutingExtractor.createRoutingExtractor(
                                     schema, config.getRoutingField().orElse(null)),
-                            inLongMetric,
+                            inlongMetric,
                             auditHostAndPorts);
 
             final ElasticsearchSink.Builder<RowData> builder =
@@ -144,7 +144,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
             builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
             builder.setBulkFlushInterval(config.getBulkFlushInterval());
             builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
-            builder.setInLongMetric(inLongMetric);
+            builder.setInLongMetric(inlongMetric);
             config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
             config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
             config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
@@ -198,12 +198,12 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
                 && Objects.equals(schema, that.schema)
                 && Objects.equals(config, that.config)
                 && Objects.equals(builderProvider, that.builderProvider)
-                && Objects.equals(inLongMetric, that.inLongMetric);
+                && Objects.equals(inlongMetric, that.inlongMetric);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(format, schema, config, builderProvider, inLongMetric);
+        return Objects.hash(format, schema, config, builderProvider, inlongMetric);
     }
 
     @FunctionalInterface
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
index ce1581ddd..df5937b64 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -107,12 +107,12 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
 
         validate(config, configuration);
 
-        String inLongMetric = helper.getOptions().get(INLONG_METRIC);
+        String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
 
         String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
 
         return new Elasticsearch6DynamicSink(
-                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric, auditHostAndPorts);
+                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts);
     }
 
     private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
index eab2dd119..10b7cdfb8 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -71,14 +71,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
             ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
             ActionRequestFailureHandler failureHandler,
             RestClientFactory restClientFactory,
-            String inLongMetric) {
+            String inlongMetric) {
 
         super(
                 new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
                 bulkRequestsConfig,
                 elasticsearchSinkFunction,
                 failureHandler,
-                inLongMetric);
+                inlongMetric);
     }
 
     /**
@@ -96,7 +96,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
         private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
         private RestClientFactory restClientFactory = restClientBuilder -> {
         };
-        private String inLongMetric = null;
+        private String inlongMetric = null;
 
         /**
          * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
@@ -115,10 +115,10 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
 
         /**
          * set InLongMetric for reporting metrics
-         * @param inLongMetric
+         * @param inlongMetric
          */
-        public void setInLongMetric(String inLongMetric) {
-            this.inLongMetric = inLongMetric;
+        public void setInLongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
         }
 
         /**
@@ -245,7 +245,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
                     elasticsearchSinkFunction,
                     failureHandler,
                     restClientFactory,
-                    inLongMetric);
+                    inlongMetric);
         }
 
         @Override
@@ -262,7 +262,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
                     && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
                     && Objects.equals(failureHandler, builder.failureHandler)
                     && Objects.equals(restClientFactory, builder.restClientFactory)
-                    && Objects.equals(inLongMetric, builder.inLongMetric);
+                    && Objects.equals(inlongMetric, builder.inlongMetric);
         }
 
         @Override
@@ -273,7 +273,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
                     bulkRequestsConfig,
                     failureHandler,
                     restClientFactory,
-                    inLongMetric);
+                    inlongMetric);
         }
     }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index 202990336..392e35dbf 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -65,7 +65,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
     private final EncodingFormat<SerializationSchema<RowData>> format;
     private final TableSchema schema;
     private final Elasticsearch7Configuration config;
-    private final String inLongMetric;
+    private final String inlongMetric;
     private final String auditHostAndPorts;
     private final ElasticSearchBuilderProvider builderProvider;
 
@@ -83,9 +83,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
             EncodingFormat<SerializationSchema<RowData>> format,
             Elasticsearch7Configuration config,
             TableSchema schema,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric, auditHostAndPorts);
+        this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts);
     }
 
     Elasticsearch7DynamicSink(
@@ -93,13 +93,13 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
             Elasticsearch7Configuration config,
             TableSchema schema,
             ElasticSearchBuilderProvider builderProvider,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this.format = format;
         this.schema = schema;
         this.config = config;
         this.builderProvider = builderProvider;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -134,7 +134,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
                             KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
                             RoutingExtractor.createRoutingExtractor(
                                     schema, config.getRoutingField().orElse(null)),
-                            inLongMetric,
+                            inlongMetric,
                             auditHostAndPorts);
 
             final ElasticsearchSink.Builder<RowData> builder =
@@ -145,7 +145,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
             builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
             builder.setBulkFlushInterval(config.getBulkFlushInterval());
             builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
-            builder.setInLongMetric(inLongMetric);
+            builder.setInLongMetric(inlongMetric);
             config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
             config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
             config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
@@ -199,13 +199,13 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
                 && Objects.equals(schema, that.schema)
                 && Objects.equals(config, that.config)
                 && Objects.equals(builderProvider, that.builderProvider)
-                && Objects.equals(inLongMetric, that.inLongMetric)
+                && Objects.equals(inlongMetric, that.inlongMetric)
                 && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(format, schema, config, builderProvider, inLongMetric, auditHostAndPorts);
+        return Objects.hash(format, schema, config, builderProvider, inlongMetric, auditHostAndPorts);
     }
 
     @FunctionalInterface
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
index 0f27db9f1..d29c646bd 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
@@ -107,12 +107,12 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory
 
         validate(config, configuration);
 
-        String inLongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
+        String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
 
         String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
 
         return new Elasticsearch7DynamicSink(
-                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric, auditHostAndPorts);
+                format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts);
     }
 
     private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) {
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index da5b5f9ef..000e1c23a 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * Base class for all Flink Elasticsearch Sinks.
@@ -121,7 +122,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
      * sink is closed.
      */
     private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-    private final String inLongMetric;
+    private final String inlongMetric;
     /**
      * If true, the producer will wait until all outstanding action requests have been sent to
      * Elasticsearch.
@@ -167,8 +168,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
             Map<String, String> userConfig,
             ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
             ActionRequestFailureHandler failureHandler,
-            String inLongMetric) {
-        this.inLongMetric = inLongMetric;
+            String inlongMetric) {
+        this.inlongMetric = inlongMetric;
         this.callBridge = checkNotNull(callBridge);
         this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
         this.failureHandler = checkNotNull(failureHandler);
@@ -265,11 +266,11 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
     @Override
     public void open(Configuration parameters) throws Exception {
         client = callBridge.createClient(userConfig);
-        if (inLongMetric != null && !inLongMetric.isEmpty()) {
-            String[] inLongMetricArray = inLongMetric.split("&");
-            String groupId = inLongMetricArray[0];
-            String streamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
+        if (inlongMetric != null && !inlongMetric.isEmpty()) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
             sinkMetricData.registerMetricsForDirtyBytes();
             sinkMetricData.registerMetricsForDirtyRecords();
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 80abeef54..0ae93231d 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -55,7 +55,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     private final XContentType contentType;
     private final RequestFactory requestFactory;
     private final Function<RowData, String> createKey;
-    private final String inLongMetric;
+    private final String inlongMetric;
     private final String auditHostAndPorts;
 
     private final Function<RowData, String> createRouting;
@@ -77,7 +77,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
             RequestFactory requestFactory,
             Function<RowData, String> createKey,
             @Nullable Function<RowData, String> createRouting,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
         this.docType = docType;
@@ -86,7 +86,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
         this.requestFactory = Preconditions.checkNotNull(requestFactory);
         this.createKey = Preconditions.checkNotNull(createKey);
         this.createRouting = createRouting;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -94,11 +94,11 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        if (inLongMetric != null && !inLongMetric.isEmpty()) {
-            String[] inLongMetricArray = inLongMetric.split("&");
-            groupId = inLongMetricArray[0];
-            streamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
+        if (inlongMetric != null && !inlongMetric.isEmpty()) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            groupId = inlongMetricArray[0];
+            streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
             sinkMetricData.registerMetricsForNumBytesOut();
             sinkMetricData.registerMetricsForNumRecordsOut();
@@ -202,7 +202,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
                 && contentType == that.contentType
                 && Objects.equals(requestFactory, that.requestFactory)
                 && Objects.equals(createKey, that.createKey)
-                && Objects.equals(inLongMetric, that.inLongMetric);
+                && Objects.equals(inlongMetric, that.inlongMetric);
     }
 
     @Override
@@ -214,6 +214,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
                 contentType,
                 requestFactory,
                 createKey,
-                inLongMetric);
+                inlongMetric);
     }
 }
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
index 63e3f87a4..da14c947a 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -104,7 +104,7 @@ public class HBase2DynamicTableFactory
         HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
-        String inlongMetric = tableOptions.get(INLONG_METRIC);
+        String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = tableOptions.get(INLONG_AUDIT);
         ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
index 821cfe72c..7d0048cea 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
@@ -118,7 +118,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
     private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();
     private boolean overwrite = false;
     private boolean dynamicGrouping = false;
-    private String inLongMetric;
+    private String inlongMetric;
     private String auditHostAndPorts;
 
     public HiveTableSink(
@@ -127,7 +127,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
             ObjectIdentifier identifier,
             CatalogTable table,
             @Nullable Integer configuredParallelism,
-            final String inLongMetric,
+            final String inlongMetric,
             final String auditHostAndPorts) {
         this.flinkConf = flinkConf;
         this.jobConf = jobConf;
@@ -140,7 +140,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
         hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
         tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
         this.configuredParallelism = configuredParallelism;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -339,7 +339,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
                             createCompactReaderFactory(sd, tableProps),
                             compactionSize,
                             parallelism,
-                            inLongMetric,
+                            inlongMetric,
                             auditHostAndPorts);
         } else {
             writerStream =
@@ -348,7 +348,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
                             bucketCheckInterval,
                             builder,
                             parallelism,
-                            inLongMetric,
+                            inlongMetric,
                             auditHostAndPorts);
         }
 
@@ -492,7 +492,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
                         identifier,
                         catalogTable,
                         configuredParallelism,
-                        inLongMetric,
+                        inlongMetric,
                         auditHostAndPorts);
         sink.staticPartitionSpec = staticPartitionSpec;
         sink.overwrite = overwrite;
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 16a7bd528..5d0ca7bf0 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -56,7 +56,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
             bucketsBuilder;
 
     @Nullable
-    private String inLongMetric;
+    private String inlongMetric;
 
     @Nullable
     private String auditHostAndPorts;
@@ -77,11 +77,11 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
             StreamingFileSink.BucketsBuilder<
                     IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>>
                     bucketsBuilder,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this.bucketCheckInterval = bucketCheckInterval;
         this.bucketsBuilder = bucketsBuilder;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
@@ -111,13 +111,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
     @Override
     public void open() throws Exception {
         super.open();
-        if (inLongMetric != null) {
-            String[] inLongMetricArray = inLongMetric.split(DELIMITER);
-            String inLongGroupId = inLongMetricArray[0];
-            String inLongStreamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
+        if (inlongMetric != null) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            String inlongGroupId = inlongMetricArray[0];
+            String inlongStreamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
             metricData = new SinkMetricData(
-                    inLongGroupId, inLongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
+                    inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
             metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
             metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java
index 22e4b3a5d..2c368ba19 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java
@@ -36,9 +36,9 @@ public class CompactFileWriter<T>
             StreamingFileSink.BucketsBuilder<
                     T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
                     bucketsBuilder,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
-        super(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+        super(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java
index 43703598a..b0c8aee26 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java
@@ -45,9 +45,9 @@ public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN, Partiti
             StreamingFileSink.BucketsBuilder<
                     IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>>
                     bucketsBuilder,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
-        super(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+        super(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
index 31bd6c1b6..1c7663064 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
@@ -65,10 +65,10 @@ public class StreamingSink {
                     T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
                     bucketsBuilder,
             int parallelism,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         StreamingFileWriter<T> fileWriter =
-                new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+                new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
         return inputStream
                 .transform(
                         StreamingFileWriter.class.getSimpleName(),
@@ -92,10 +92,10 @@ public class StreamingSink {
             CompactReader.Factory<T> readFactory,
             long targetFileSize,
             int parallelism,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         CompactFileWriter<T> writer = new CompactFileWriter<>(
-                bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+                bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
 
         SupplierWithException<FileSystem, IOException> fsSupplier =
                 (SupplierWithException<FileSystem, IOException> & Serializable)
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
index 9dbe065aa..5a744cd0b 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
@@ -102,7 +102,7 @@ public class HiveTableInlongFactory implements DynamicTableSourceFactory, Dynami
                         Integer configuredParallelism =
                                 Configuration.fromMap(context.getCatalogTable().getOptions())
                                         .get(FileSystemOptions.SINK_PARALLELISM);
-            final String inLongMetric = context.getCatalogTable().getOptions()
+            final String inlongMetric = context.getCatalogTable().getOptions()
                     .getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue());
             final String auditHostAndPorts = context.getCatalogTable().getOptions()
                     .getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue());
@@ -113,7 +113,7 @@ public class HiveTableInlongFactory implements DynamicTableSourceFactory, Dynami
                     context.getObjectIdentifier(),
                     context.getCatalogTable(),
                     configuredParallelism,
-                    inLongMetric,
+                    inlongMetric,
                     auditHostAndPorts);
         } else {
             return FactoryUtil.createTableSink(
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index a7099c3cc..d86857338 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -134,7 +134,7 @@ public class FlinkSink {
         private boolean upsert = false;
         private List<String> equalityFieldColumns = null;
         private String uidPrefix = null;
-        private String inLongMetric = null;
+        private String inlongMetric = null;
         private String auditHostAndPorts = null;
 
         private Builder() {
@@ -198,12 +198,12 @@ public class FlinkSink {
 
         /**
          * Add metric output for iceberg writer
-         * @param inLongMetric
+         * @param inlongMetric
          * @param auditHostAndPorts
          * @return
          */
-        public Builder metric(String inLongMetric, String auditHostAndPorts) {
-            this.inLongMetric = inLongMetric;
+        public Builder metric(String inlongMetric, String auditHostAndPorts) {
+            this.inlongMetric = inlongMetric;
             this.auditHostAndPorts = auditHostAndPorts;
             return this;
         }
@@ -400,7 +400,7 @@ public class FlinkSink {
             }
 
             IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
-                    table, flinkRowType, equalityFieldIds, upsertMode, inLongMetric, auditHostAndPorts);
+                    table, flinkRowType, equalityFieldIds, upsertMode, inlongMetric, auditHostAndPorts);
 
             int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
             SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -474,7 +474,7 @@ public class FlinkSink {
             RowType flinkRowType,
             List<Integer> equalityFieldIds,
             boolean upsert,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         Preconditions.checkArgument(table != null, "Iceberg table should't be null");
         Map<String, String> props = table.properties();
@@ -486,7 +486,7 @@ public class FlinkSink {
                 serializableTable, flinkRowType, targetFileSize,
                 fileFormat, equalityFieldIds, upsert);
 
-        return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inLongMetric, auditHostAndPorts);
+        return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
     }
 
     private static FileFormat getFileFormat(Map<String, String> properties) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 118a97e15..75eca46c5 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -43,7 +43,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
 
     private final String fullTableName;
     private final TaskWriterFactory<T> taskWriterFactory;
-    private final String inLongMetric;
+    private final String inlongMetric;
     private final String auditHostAndPorts;
 
     private transient TaskWriter<T> writer;
@@ -55,11 +55,11 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
     IcebergStreamWriter(
             String fullTableName,
             TaskWriterFactory<T> taskWriterFactory,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this.fullTableName = fullTableName;
         this.taskWriterFactory = taskWriterFactory;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
@@ -76,13 +76,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         this.writer = taskWriterFactory.create();
 
         // Initialize metric
-        if (inLongMetric != null) {
-            String[] inLongMetricArray = inLongMetric.split(DELIMITER);
-            String inLongGroupId = inLongMetricArray[0];
-            String inLongStreamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
+        if (inlongMetric != null) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            String inlongGroupId = inlongMetricArray[0];
+            String inlongStreamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
             metricData = new SinkMetricData(
-                    inLongGroupId, inLongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
+                    inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
             metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
             metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 98f0f0cf6..66af78c4d 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -69,7 +69,7 @@ public class JdbcBatchingOutputFormat<
     private final JdbcExecutionOptions executionOptions;
     private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
     private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
-    private final String inLongMetric;
+    private final String inlongMetric;
     private final String auditHostAndPorts;
     private transient JdbcExec jdbcStatementExecutor;
     private transient int batchCount = 0;
@@ -80,8 +80,8 @@ public class JdbcBatchingOutputFormat<
     private transient RuntimeContext runtimeContext;
 
     private SinkMetricData sinkMetricData;
-    private String inLongGroupId;
-    private String inLongStreamId;
+    private String inlongGroupId;
+    private String inlongStreamId;
     private transient AuditImp auditImp;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
@@ -91,13 +91,13 @@ public class JdbcBatchingOutputFormat<
             @Nonnull JdbcExecutionOptions executionOptions,
             @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
             @Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         super(connectionProvider);
         this.executionOptions = checkNotNull(executionOptions);
         this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
         this.jdbcRecordExtractor = checkNotNull(recordExtractor);
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -130,12 +130,12 @@ public class JdbcBatchingOutputFormat<
     public void open(int taskNumber, int numTasks) throws IOException {
         super.open(taskNumber, numTasks);
         this.runtimeContext = getRuntimeContext();
-        if (inLongMetric != null && !inLongMetric.isEmpty()) {
-            String[] inLongMetricArray = inLongMetric.split(DELIMITER);
-            inLongGroupId = inLongMetricArray[0];
-            inLongStreamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
-            sinkMetricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, runtimeContext.getMetricGroup());
+        if (inlongMetric != null && !inlongMetric.isEmpty()) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            inlongGroupId = inlongMetricArray[0];
+            inlongStreamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            sinkMetricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, runtimeContext.getMetricGroup());
             sinkMetricData.registerMetricsForDirtyBytes();
             sinkMetricData.registerMetricsForDirtyRecords();
             sinkMetricData.registerMetricsForNumBytesOut();
@@ -207,8 +207,8 @@ public class JdbcBatchingOutputFormat<
         if (auditImp != null) {
             auditImp.add(
                     AUDIT_SORT_INPUT,
-                    inLongGroupId,
-                    inLongStreamId,
+                    inlongGroupId,
+                    inlongStreamId,
                     System.currentTimeMillis(),
                     1,
                     length);
@@ -371,7 +371,7 @@ public class JdbcBatchingOutputFormat<
         private String[] fieldNames;
         private String[] keyFields;
         private int[] fieldTypes;
-        private String inLongMetric;
+        private String inlongMetric;
         private String auditHostAndPorts;
         private JdbcExecutionOptions.Builder executionOptionsBuilder =
                 JdbcExecutionOptions.builder();
@@ -409,10 +409,10 @@ public class JdbcBatchingOutputFormat<
         }
 
         /**
-         * required, inLongMetric
+         * required, inlongMetric
          */
-        public Builder setinLongMetric(String inLongMetric) {
-            this.inLongMetric = inLongMetric;
+        public Builder setinlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
             return this;
         }
 
@@ -471,7 +471,7 @@ public class JdbcBatchingOutputFormat<
                         new SimpleJdbcConnectionProvider(options),
                         dml,
                         executionOptionsBuilder.build(),
-                        inLongMetric,
+                        inlongMetric,
                         auditHostAndPorts);
             } else {
                 // warn: don't close over builder fields
@@ -493,7 +493,7 @@ public class JdbcBatchingOutputFormat<
                             Preconditions.checkArgument(tuple2.f0);
                             return tuple2.f1;
                         },
-                        inLongMetric,
+                        inlongMetric,
                         auditHostAndPorts);
             }
         }
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 4ef1ff2e3..4101913c9 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -58,14 +58,14 @@ class TableJdbcUpsertOutputFormat
             JdbcConnectionProvider connectionProvider,
             JdbcDmlOptions dmlOptions,
             JdbcExecutionOptions batchOptions,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this(
                 connectionProvider,
                 batchOptions,
                 ctx -> createUpsertRowExecutor(dmlOptions, ctx),
                 ctx -> createDeleteExecutor(dmlOptions, ctx),
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 
@@ -76,11 +76,11 @@ class TableJdbcUpsertOutputFormat
             StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> statementExecutorFactory,
             StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
                     deleteStatementExecutorFactory,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts
     ) {
         super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1,
-                inLongMetric, auditHostAndPorts);
+                inlongMetric, auditHostAndPorts);
         this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
     }
 
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 1e0369e2c..fa46f8436 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -64,7 +64,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
     private boolean appendMode;
     private TypeInformation<RowData> rowDataTypeInformation;
     private DataType[] fieldDataTypes;
-    private String inLongMetric;
+    private String inlongMetric;
     private String auditHostAndPorts;
 
     public JdbcDynamicOutputFormatBuilder() {
@@ -236,8 +236,8 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
         return this;
     }
 
-    public JdbcDynamicOutputFormatBuilder setInLongMetric(String inLongMetric) {
-        this.inLongMetric = inLongMetric;
+    public JdbcDynamicOutputFormatBuilder setInLongMetric(String inlongMetric) {
+        this.inlongMetric = inlongMetric;
         return this;
     }
 
@@ -264,7 +264,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
                             createBufferReduceExecutor(
                                     dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
                     JdbcBatchingOutputFormat.RecordExtractor.identity(),
-                    inLongMetric,
+                    inlongMetric,
                     auditHostAndPorts);
         } else {
             // append only query
@@ -285,7 +285,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
                                     sql,
                                     rowDataTypeInformation),
                     JdbcBatchingOutputFormat.RecordExtractor.identity(),
-                    inLongMetric,
+                    inlongMetric,
                     auditHostAndPorts);
         }
     }
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 328f86a45..1efe0a91b 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -188,7 +188,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         TableSchema physicalSchema =
                 TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         boolean appendMode = config.get(SINK_APPEND_MODE);
-        String inLongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null);
         return new JdbcDynamicTableSink(
                 jdbcOptions,
@@ -196,7 +196,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
                 getJdbcDmlOptions(jdbcOptions, physicalSchema),
                 physicalSchema,
                 appendMode,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index 92e54e816..f6dd579b3 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -50,7 +50,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
     private final TableSchema tableSchema;
     private final String dialectName;
 
-    private final String inLongMetric;
+    private final String inlongMetric;
     private final String auditHostAndPorts;
     private final boolean appendMode;
 
@@ -60,7 +60,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
             JdbcDmlOptions dmlOptions,
             TableSchema tableSchema,
             boolean appendMode,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this.jdbcOptions = jdbcOptions;
         this.executionOptions = executionOptions;
@@ -68,7 +68,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
         this.tableSchema = tableSchema;
         this.dialectName = dmlOptions.getDialect().dialectName();
         this.appendMode = appendMode;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -101,7 +101,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
         builder.setJdbcExecutionOptions(executionOptions);
         builder.setRowDataTypeInfo(rowDataTypeInformation);
         builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
-        builder.setInLongMetric(inLongMetric);
+        builder.setInLongMetric(inlongMetric);
         builder.setAuditHostAndPorts(auditHostAndPorts);
         return SinkFunctionProvider.of(
                 new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
@@ -110,7 +110,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
     @Override
     public DynamicTableSink copy() {
         return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions,
-                tableSchema, appendMode, inLongMetric, auditHostAndPorts);
+                tableSchema, appendMode, inlongMetric, auditHostAndPorts);
     }
 
     @Override
@@ -132,13 +132,13 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
                 && Objects.equals(dmlOptions, that.dmlOptions)
                 && Objects.equals(tableSchema, that.tableSchema)
                 && Objects.equals(dialectName, that.dialectName)
-                && Objects.equals(inLongMetric, that.inLongMetric)
+                && Objects.equals(inlongMetric, that.inlongMetric)
                 && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName,
-                inLongMetric, auditHostAndPorts);
+                inlongMetric, auditHostAndPorts);
     }
 }
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 75b965340..b2efd2c3e 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -215,7 +215,7 @@ public class FlinkKafkaProducer<IN>
     /**
      * Metric for InLong
      */
-    private final String inLongMetric;
+    private final String inlongMetric;
     /**
      * audit host and ports
      */
@@ -245,11 +245,11 @@ public class FlinkKafkaProducer<IN>
     /**
      * inLong groupId
      */
-    private String inLongGroupId;
+    private String inlongGroupId;
     /**
      * inLong streamId
      */
-    private String inLongStreamId;
+    private String inlongStreamId;
     /**
      * sink metric data
      */
@@ -609,7 +609,7 @@ public class FlinkKafkaProducer<IN>
             Properties producerConfig,
             FlinkKafkaProducer.Semantic semantic,
             int kafkaProducersPoolSize,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         this(
                 defaultTopic,
@@ -619,7 +619,7 @@ public class FlinkKafkaProducer<IN>
                 producerConfig,
                 semantic,
                 kafkaProducersPoolSize,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 
@@ -659,13 +659,13 @@ public class FlinkKafkaProducer<IN>
             Properties producerConfig,
             FlinkKafkaProducer.Semantic semantic,
             int kafkaProducersPoolSize,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         super(
                 new FlinkKafkaProducer.TransactionStateSerializer(),
                 new FlinkKafkaProducer.ContextStateSerializer());
 
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
 
         this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");
@@ -905,12 +905,12 @@ public class FlinkKafkaProducer<IN>
                     RuntimeContextInitializationContextAdapters.serializationAdapter(
                             getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
         }
-        if (inLongMetric != null && !inLongMetric.isEmpty()) {
-            String[] inLongMetricArray = inLongMetric.split(DELIMITER);
-            inLongGroupId = inLongMetricArray[0];
-            inLongStreamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
-            metricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, ctx.getMetricGroup());
+        if (inlongMetric != null && !inlongMetric.isEmpty()) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            inlongGroupId = inlongMetricArray[0];
+            inlongStreamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup());
             metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
             metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
@@ -945,8 +945,8 @@ public class FlinkKafkaProducer<IN>
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_OUTPUT,
-                    inLongGroupId,
-                    inLongStreamId,
+                    inlongGroupId,
+                    inlongStreamId,
                     System.currentTimeMillis(),
                     1,
                     record.value().length);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
index 387baee79..6f66a6e3d 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
@@ -139,7 +139,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
     /**
      * Metric for inLong
      */
-    private final String inLongMetric;
+    private final String inlongMetric;
     /**
      * audit host and ports
      */
@@ -172,7 +172,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
             boolean upsertMode,
             SinkBufferFlushMode flushMode,
             @Nullable Integer parallelism,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         // Format attributes
         this.consumedDataType =
@@ -200,7 +200,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
                     "Sink buffer flush is only supported in upsert-kafka.");
         }
         this.parallelism = parallelism;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -302,7 +302,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
                         upsertMode,
                         flushMode,
                         parallelism,
-                        inLongMetric,
+                        inlongMetric,
                         auditHostAndPorts);
         copy.metadataKeys = metadataKeys;
         return copy;
@@ -337,7 +337,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
                 && Objects.equals(upsertMode, that.upsertMode)
                 && Objects.equals(flushMode, that.flushMode)
                 && Objects.equals(parallelism, that.parallelism)
-                && Objects.equals(inLongMetric, that.inLongMetric)
+                && Objects.equals(inlongMetric, that.inlongMetric)
                 && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
     }
 
@@ -359,7 +359,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
                 upsertMode,
                 flushMode,
                 parallelism,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 
@@ -420,7 +420,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
                 properties,
                 FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
                 FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index ef8660280..17e92abda 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -67,11 +67,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
 
     private SourceMetricData metricData;
 
-    private String inLongGroupId;
+    private String inlongGroupId;
 
     private String auditHostAndPorts;
 
-    private String inLongStreamId;
+    private String inlongStreamId;
 
     private transient AuditImp auditImp;
 
@@ -85,7 +85,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
             boolean upsertMode,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         if (upsertMode) {
             Preconditions.checkArgument(
@@ -105,7 +105,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
                         upsertMode);
         this.producedTypeInfo = producedTypeInfo;
         this.upsertMode = upsertMode;
-        this.inlongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
 
     }
@@ -117,11 +117,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
         }
         valueDeserialization.open(context);
         if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inLongMetricArray = inlongMetric.split(DELIMITER);
-            inLongGroupId = inLongMetricArray[0];
-            inLongStreamId = inLongMetricArray[1];
-            String nodeId = inLongMetricArray[2];
-            metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, context.getMetricGroup());
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            inlongGroupId = inlongMetricArray[0];
+            inlongStreamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
             metricData.registerMetricsForNumBytesIn();
             metricData.registerMetricsForNumBytesInPerSecond();
             metricData.registerMetricsForNumRecordsIn();
@@ -186,8 +186,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_INPUT,
-                    inLongGroupId,
-                    inLongStreamId,
+                    inlongGroupId,
+                    inlongStreamId,
                     System.currentTimeMillis(),
                     1,
                     record.value().length);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index d49af2d00..f3580a8f1 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -140,7 +140,7 @@ public class KafkaDynamicSource
     /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
     protected final boolean upsertMode;
 
-    protected final String inLongMetric;
+    protected final String inlongMetric;
 
     protected final String auditHostAndPorts;
 
@@ -158,7 +158,7 @@ public class KafkaDynamicSource
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
             boolean upsertMode,
-            final String inLongMetric,
+            final String inlongMetric,
             final String auditHostAndPorts) {
         // Format attributes
         this.physicalDataType =
@@ -192,7 +192,7 @@ public class KafkaDynamicSource
                         specificStartupOffsets, "Specific offsets must not be null.");
         this.startupTimestampMillis = startupTimestampMillis;
         this.upsertMode = upsertMode;
-        this.inLongMetric = inLongMetric;
+        this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
@@ -214,7 +214,7 @@ public class KafkaDynamicSource
 
         final FlinkKafkaConsumer<RowData> kafkaConsumer =
                 createKafkaConsumer(keyDeserialization, valueDeserialization,
-                    producedTypeInfo, inLongMetric, auditHostAndPorts);
+                    producedTypeInfo, inlongMetric, auditHostAndPorts);
 
         return SourceFunctionProvider.of(kafkaConsumer, false);
     }
@@ -284,7 +284,7 @@ public class KafkaDynamicSource
                         startupMode,
                         specificStartupOffsets,
                         startupTimestampMillis,
-                        upsertMode, inLongMetric, auditHostAndPorts);
+                        upsertMode, inlongMetric, auditHostAndPorts);
         copy.producedDataType = producedDataType;
         copy.metadataKeys = metadataKeys;
         copy.watermarkStrategy = watermarkStrategy;
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 8efee1317..2127886d3 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -254,7 +254,7 @@ public class KafkaDynamicTableFactory
 
         final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
 
-        final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
 
         final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
 
@@ -271,7 +271,7 @@ public class KafkaDynamicTableFactory
                 startupOptions.startupMode,
                 startupOptions.specificOffsets,
                 startupOptions.startupTimestampMillis,
-            inLongMetric,
+            inlongMetric,
             auditHostAndPorts);
     }
 
@@ -307,7 +307,7 @@ public class KafkaDynamicTableFactory
 
         final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
 
-        final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
 
         final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
 
@@ -324,7 +324,7 @@ public class KafkaDynamicTableFactory
                 getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
                 getSinkSemantic(tableOptions),
                 parallelism,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 
@@ -343,7 +343,7 @@ public class KafkaDynamicTableFactory
             StartupMode startupMode,
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
-            String inLongMetric,
+            String inlongMetric,
         String auditHostAndPorts) {
         return new KafkaDynamicSource(
                 physicalDataType,
@@ -359,7 +359,7 @@ public class KafkaDynamicTableFactory
                 specificStartupOffsets,
                 startupTimestampMillis,
                 false,
-            inLongMetric,
+            inlongMetric,
             auditHostAndPorts);
     }
 
@@ -376,7 +376,7 @@ public class KafkaDynamicTableFactory
             FlinkKafkaPartitioner<RowData> partitioner,
             KafkaSinkSemantic semantic,
             Integer parallelism,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         return new KafkaDynamicSink(
                 physicalDataType,
@@ -394,7 +394,7 @@ public class KafkaDynamicTableFactory
                 false,
                 SinkBufferFlushMode.DISABLED,
                 parallelism,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 }
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 6e803cfd9..e3fa28abc 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -256,7 +256,7 @@ public class UpsertKafkaDynamicTableFactory
         Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
         SinkBufferFlushMode flushMode =
                 new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
-        String inLongMetric = tableOptions.get(INLONG_METRIC);
+        String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
         final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
 
         // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
@@ -277,7 +277,7 @@ public class UpsertKafkaDynamicTableFactory
                 true,
                 flushMode,
                 parallelism,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 450990eb4..42bb53732 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index d235bc860..919c227e4 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -224,7 +224,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
                 TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
                         ? ZoneId.systemDefault()
                         : ZoneId.of(zoneId);
-        final String inlongMetric = config.get(INLONG_METRIC);
+        final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         final String inlongAudit = config.get(INLONG_AUDIT);
         ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index d5b3c676b..a7eebdbcd 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -48,7 +48,6 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index c3c475e92..c8a70b8bd 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -122,7 +122,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
                 DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
 
         final ReadableConfig config = helper.getOptions();
-        final String inlongMetric = config.get(INLONG_METRIC);
+        final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         final String inlongAudit = config.get(INLONG_AUDIT);
         ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
         final String hostname = config.get(HOSTNAME);
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 4098cc563..1458693fb 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index 830bfd078..0b4471ae3 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -112,7 +112,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
         int port = config.get(PORT);
         StartupOptions startupOptions = getStartupOptions(config);
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
-        String inlongMetric = config.get(INLONG_METRIC);
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = config.get(INLONG_AUDIT);
         ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
         return new OracleTableSource(
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index ece63db62..cd78621db 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index e13fc49ae..a886f8aef 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -125,7 +125,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
         String pluginName = config.get(DECODING_PLUGIN_NAME);
         String slotName = config.get(SLOT_NAME);
         ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
-        String inlongMetric = config.get(INLONG_METRIC);
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String inlongAudit = config.get(INLONG_AUDIT);
         ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
 
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 13d6e7c21..bf74dc8d2 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -273,7 +273,9 @@ public class PulsarDynamicTableFactory implements
 
         String adminUrl = tableOptions.get(ADMIN_URL);
         String serviceUrl = tableOptions.get(SERVICE_URL);
-        String inlongMetric = tableOptions.get(INLONG_METRIC);
+
+        String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+
         String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
 
         return createPulsarTableSource(
@@ -361,7 +363,7 @@ public class PulsarDynamicTableFactory implements
             String adminUrl,
             Properties properties,
             PulsarTableOptions.StartupOptions startupOptions,
-            String inLongMetric,
+            String inlongMetric,
             String auditHostAndPorts) {
         return new PulsarDynamicTableSource(
                 physicalDataType,
@@ -377,7 +379,7 @@ public class PulsarDynamicTableFactory implements
                 properties,
                 startupOptions,
                 false,
-                inLongMetric,
+                inlongMetric,
                 auditHostAndPorts);
     }
 }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index c709daa66..ff0361b16 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -187,7 +187,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
         String serverUrl = tableOptions.get(SERVICE_URL);
         List<String> topics = tableOptions.get(TOPIC);
         String topicPattern = tableOptions.get(TOPIC_PATTERN);
-        String inlongMetric = tableOptions.get(INLONG_METRIC);
+        String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
 
         return new PulsarDynamicTableSource(
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
index c1cea9dba..c28115846 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -217,11 +217,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
 
     private SourceMetricData metricData;
 
-    private String inLongGroupId;
+    private String inlongGroupId;
 
     private String auditHostAndPorts;
 
-    private String inLongStreamId;
+    private String inlongStreamId;
 
     private transient AuditImp auditImp;
 
@@ -413,11 +413,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         metricGroup.gauge(
                 "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
         if (StringUtils.isNotEmpty(this.inlongMetric)) {
-            String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
-            inLongGroupId = inlongMetricArray[0];
-            inLongStreamId = inlongMetricArray[1];
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            inlongGroupId = inlongMetricArray[0];
+            inlongStreamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, metricGroup);
+            metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, metricGroup);
             metricData.registerMetricsForNumRecordsIn();
             metricData.registerMetricsForNumBytesIn();
             metricData.registerMetricsForNumBytesInPerSecond();
@@ -511,8 +511,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         if (auditImp != null) {
             auditImp.add(
                 Constants.AUDIT_SORT_INPUT,
-                inLongGroupId,
-                inLongStreamId,
+                inlongGroupId,
+                inlongStreamId,
                 System.currentTimeMillis(),
                 1,
                 record.value().toString().getBytes(StandardCharsets.UTF_8).length);
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
index e6a7e5be6..39e6a642c 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
@@ -114,7 +114,7 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory {
         String schemaName = config.get(SCHEMA_NAME);
         String databaseName = config.get(DATABASE_NAME);
         String tableName = config.get(TABLE_NAME);
-        String inlongMetric = config.get(INLONG_METRIC);
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = config.get(INLONG_AUDIT);
         ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
         int port = config.get(PORT);