You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/10 08:15:49 UTC

[inlong] branch master updated: [INLONG-5446][Sort] Add reporting metric from JDBC to audit SDK and refactor according to… (#5449)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 22a32e8cb [INLONG-5446][Sort] Add reporting metric from JDBC to audit SDK and refactor according to… (#5449)
22a32e8cb is described below

commit 22a32e8cb925d7107218d049b10188e5b953e148
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Wed Aug 10 16:15:45 2022 +0800

    [INLONG-5446][Sort] Add reporting metric from JDBC to audit SDK and refactor according to… (#5449)
---
 inlong-sort/sort-connectors/jdbc/pom.xml           |  10 ++
 .../jdbc/internal/JdbcBatchingOutputFormat.java    | 114 +++++++++++++++------
 .../jdbc/internal/TableJdbcUpsertOutputFormat.java |  12 ++-
 .../apache/inlong/sort/jdbc/metric/MetricData.java | 111 --------------------
 .../jdbc/table/JdbcDynamicOutputFormatBuilder.java |  12 ++-
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   |  15 ++-
 .../sort/jdbc/table/JdbcDynamicTableSink.java      |  15 ++-
 7 files changed, 128 insertions(+), 161 deletions(-)

diff --git a/inlong-sort/sort-connectors/jdbc/pom.xml b/inlong-sort/sort-connectors/jdbc/pom.xml
index 5230f8ce5..79a3a1c2c 100644
--- a/inlong-sort/sort-connectors/jdbc/pom.xml
+++ b/inlong-sort/sort-connectors/jdbc/pom.xml
@@ -33,6 +33,16 @@
     <packaging>jar</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--for clickhouse-->
         <dependency>
             <groupId>ru.yandex.clickhouse</groupId>
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 95044c8bf..55474def8 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -33,7 +33,8 @@ import org.apache.flink.connector.jdbc.utils.JdbcUtils;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.jdbc.metric.MetricData;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +43,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -51,6 +54,14 @@ import java.util.function.Function;
 
 import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
 
 /**
  * A JDBC outputFormat that supports batching records before writing records to database.
@@ -66,6 +77,7 @@ public class JdbcBatchingOutputFormat<
     private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
     private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
     private final String inLongMetric;
+    private final String auditHostAndPorts;
     private transient JdbcExec jdbcStatementExecutor;
     private transient int batchCount = 0;
     private transient volatile boolean closed = false;
@@ -74,7 +86,10 @@ public class JdbcBatchingOutputFormat<
     private transient volatile Exception flushException;
     private transient RuntimeContext runtimeContext;
 
-    private MetricData metricData;
+    private SinkMetricData sinkMetricData;
+    private String inLongGroupId;
+    private String inLongStreamId;
+    private transient AuditImp auditImp;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
 
@@ -83,12 +98,14 @@ public class JdbcBatchingOutputFormat<
             @Nonnull JdbcExecutionOptions executionOptions,
             @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
             @Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
-            String inLongMetric) {
+            String inLongMetric,
+            String auditHostAndPorts) {
         super(connectionProvider);
         this.executionOptions = checkNotNull(executionOptions);
         this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
         this.jdbcRecordExtractor = checkNotNull(recordExtractor);
         this.inLongMetric = inLongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     public static Builder builder() {
@@ -120,19 +137,28 @@ public class JdbcBatchingOutputFormat<
     public void open(int taskNumber, int numTasks) throws IOException {
         super.open(taskNumber, numTasks);
         this.runtimeContext = getRuntimeContext();
-        metricData = new MetricData(runtimeContext.getMetricGroup());
+        sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {
-            String[] inLongMetricArray = inLongMetric.split("&");
-            String groupId = inLongMetricArray[0];
-            String streamId = inLongMetricArray[1];
+            String[] inLongMetricArray = inLongMetric.split(DELIMITER);
+            inLongGroupId = inLongMetricArray[0];
+            inLongStreamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
-            metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
-            metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
-            metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
-            metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
-            metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
-                    "numRecordsOutPerSecond");
+            sinkMetricData.registerMetricsForDirtyBytes(inLongGroupId, inLongStreamId,
+                    nodeId, DIRTY_BYTES);
+            sinkMetricData.registerMetricsForDirtyRecords(inLongGroupId, inLongStreamId,
+                    nodeId, DIRTY_RECORDS);
+            sinkMetricData.registerMetricsForNumBytesOut(inLongGroupId, inLongStreamId,
+                    nodeId, NUM_BYTES_OUT);
+            sinkMetricData.registerMetricsForNumRecordsOut(inLongGroupId, inLongStreamId,
+                    nodeId, NUM_RECORDS_OUT);
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
+                    NUM_BYTES_OUT_PER_SECOND);
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
+                    NUM_RECORDS_OUT_PER_SECOND);
+        }
+        if (auditHostAndPorts != null) {
+            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+            auditImp = AuditImp.getInstance();
         }
         jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
         if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -146,20 +172,20 @@ public class JdbcBatchingOutputFormat<
                                     if (!closed) {
                                         try {
                                             flush();
-                                            if (metricData.getNumRecordsOut() != null) {
-                                                metricData.getNumRecordsOut().inc(rowSize);
+                                            if (sinkMetricData.getNumRecordsOut() != null) {
+                                                sinkMetricData.getNumRecordsOut().inc(rowSize);
                                             }
-                                            if (metricData.getNumBytesOut() != null) {
-                                                metricData.getNumBytesOut()
+                                            if (sinkMetricData.getNumBytesOut() != null) {
+                                                sinkMetricData.getNumBytesOut()
                                                         .inc(dataSize);
                                             }
                                             resetStateAfterFlush();
                                         } catch (Exception e) {
-                                            if (metricData.getDirtyRecords() != null) {
-                                                metricData.getDirtyRecords().inc(rowSize);
+                                            if (sinkMetricData.getDirtyRecords() != null) {
+                                                sinkMetricData.getDirtyRecords().inc(rowSize);
                                             }
-                                            if (metricData.getDirtyBytes() != null) {
-                                                metricData.getDirtyBytes().inc(dataSize);
+                                            if (sinkMetricData.getDirtyBytes() != null) {
+                                                sinkMetricData.getDirtyBytes().inc(dataSize);
                                             }
                                             resetStateAfterFlush();
                                             flushException = e;
@@ -190,33 +216,46 @@ public class JdbcBatchingOutputFormat<
         }
     }
 
+    private void outputMetricForAudit(long length) {
+        if (auditImp != null) {
+            auditImp.add(
+                    AUDIT_SORT_INPUT,
+                    inLongGroupId,
+                    inLongStreamId,
+                    System.currentTimeMillis(),
+                    1,
+                    length);
+        }
+    }
+
     @Override
     public final synchronized void writeRecord(In record) throws IOException {
         checkFlushException();
 
         rowSize++;
         dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
+        outputMetricForAudit(dataSize);
         try {
             addToBatch(record, jdbcRecordExtractor.apply(record));
             batchCount++;
             if (executionOptions.getBatchSize() > 0
                     && batchCount >= executionOptions.getBatchSize()) {
                 flush();
-                if (metricData.getNumRecordsOut() != null) {
-                    metricData.getNumRecordsOut().inc(rowSize);
+                if (sinkMetricData.getNumRecordsOut() != null) {
+                    sinkMetricData.getNumRecordsOut().inc(rowSize);
                 }
-                if (metricData.getNumBytesOut() != null) {
-                    metricData.getNumBytesOut()
+                if (sinkMetricData.getNumBytesOut() != null) {
+                    sinkMetricData.getNumBytesOut()
                             .inc(dataSize);
                 }
                 resetStateAfterFlush();
             }
         } catch (Exception e) {
-            if (metricData.getDirtyRecords() != null) {
-                metricData.getDirtyRecords().inc(rowSize);
+            if (sinkMetricData.getDirtyRecords() != null) {
+                sinkMetricData.getDirtyRecords().inc(rowSize);
             }
-            if (metricData.getDirtyBytes() != null) {
-                metricData.getDirtyBytes().inc(dataSize);
+            if (sinkMetricData.getDirtyBytes() != null) {
+                sinkMetricData.getDirtyBytes().inc(dataSize);
             }
             resetStateAfterFlush();
             throw new IOException("Writing records to JDBC failed.", e);
@@ -346,6 +385,7 @@ public class JdbcBatchingOutputFormat<
         private String[] keyFields;
         private int[] fieldTypes;
         private String inLongMetric;
+        private String auditHostAndPorts;
         private JdbcExecutionOptions.Builder executionOptionsBuilder =
                 JdbcExecutionOptions.builder();
 
@@ -389,6 +429,14 @@ public class JdbcBatchingOutputFormat<
             return this;
         }
 
+        /**
+         * auditHostAndPorts
+         */
+        public Builder setAuditHostAndPorts(String auditHostAndPorts) {
+            this.auditHostAndPorts = auditHostAndPorts;
+            return this;
+        }
+
         /**
          * optional, flush max size (includes all append, upsert and delete records), over this
          * number of records, will flush data.
@@ -436,7 +484,8 @@ public class JdbcBatchingOutputFormat<
                         new SimpleJdbcConnectionProvider(options),
                         dml,
                         executionOptionsBuilder.build(),
-                        inLongMetric);
+                        inLongMetric,
+                        auditHostAndPorts);
             } else {
                 // warn: don't close over builder fields
                 String sql =
@@ -457,7 +506,8 @@ public class JdbcBatchingOutputFormat<
                             Preconditions.checkArgument(tuple2.f0);
                             return tuple2.f1;
                         },
-                        inLongMetric);
+                        inLongMetric,
+                        auditHostAndPorts);
             }
         }
     }
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 3d293ada1..4ef1ff2e3 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -58,13 +58,15 @@ class TableJdbcUpsertOutputFormat
             JdbcConnectionProvider connectionProvider,
             JdbcDmlOptions dmlOptions,
             JdbcExecutionOptions batchOptions,
-            String inLongMetric) {
+            String inLongMetric,
+            String auditHostAndPorts) {
         this(
                 connectionProvider,
                 batchOptions,
                 ctx -> createUpsertRowExecutor(dmlOptions, ctx),
                 ctx -> createDeleteExecutor(dmlOptions, ctx),
-                inLongMetric);
+                inLongMetric,
+                auditHostAndPorts);
     }
 
     @VisibleForTesting
@@ -74,9 +76,11 @@ class TableJdbcUpsertOutputFormat
             StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> statementExecutorFactory,
             StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
                     deleteStatementExecutorFactory,
-            String inLongMetric
+            String inLongMetric,
+            String auditHostAndPorts
     ) {
-        super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1, inLongMetric);
+        super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1,
+                inLongMetric, auditHostAndPorts);
         this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
     }
 
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java
deleted file mode 100644
index af136efcf..000000000
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *   Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.inlong.sort.jdbc.metric;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-
-/**
- * A collection class for handling metrics
- */
-public class MetricData {
-
-    private static Integer TIME_SPAN_IN_SECONDS = 60;
-    private static String STREAM_ID = "streamId";
-    private static String GROUP_ID = "groupId";
-    private static String NODE_ID = "nodeId";
-    private final MetricGroup metricGroup;
-    private Counter numRecordsOut;
-    private Counter numBytesOut;
-    private Counter dirtyRecords;
-    private Counter dirtyBytes;
-    private Meter numRecordsOutPerSecond;
-    private Meter numBytesOutPerSecond;
-
-    public MetricData(MetricGroup metricGroup) {
-        this.metricGroup = metricGroup;
-    }
-
-    public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
-        numRecordsOut =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
-    }
-
-    public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
-        numBytesOut =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
-    }
-
-    public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
-            String metricName) {
-        numRecordsOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
-                        nodeId)
-                .meter(metricName, new MeterView(this.numRecordsOut, TIME_SPAN_IN_SECONDS));
-    }
-
-    public void registerMetricsForNumBytesOutPerSecond(String groupId, String streamId, String nodeId,
-            String metricName) {
-        numBytesOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
-                .addGroup(NODE_ID, nodeId)
-                .meter(metricName, new MeterView(this.numBytesOut, TIME_SPAN_IN_SECONDS));
-    }
-
-    public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
-            String metricName) {
-        dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                .counter(metricName);
-    }
-
-    public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
-            String metricName) {
-        dirtyBytes =
-                metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
-                        .counter(metricName);
-
-    }
-
-    public Counter getNumRecordsOut() {
-        return numRecordsOut;
-    }
-
-    public Counter getNumBytesOut() {
-        return numBytesOut;
-    }
-
-    public Counter getDirtyRecords() {
-        return dirtyRecords;
-    }
-
-    public Counter getDirtyBytes() {
-        return dirtyBytes;
-    }
-
-    public Meter getNumRecordsOutPerSecond() {
-        return numRecordsOutPerSecond;
-    }
-
-    public Meter getNumBytesOutPerSecond() {
-        return numBytesOutPerSecond;
-    }
-
-}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 33991d220..1e0369e2c 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -65,6 +65,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
     private TypeInformation<RowData> rowDataTypeInformation;
     private DataType[] fieldDataTypes;
     private String inLongMetric;
+    private String auditHostAndPorts;
 
     public JdbcDynamicOutputFormatBuilder() {
 
@@ -240,6 +241,11 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
         return this;
     }
 
+    public JdbcDynamicOutputFormatBuilder setAuditHostAndPorts(String auditHostAndPorts) {
+        this.auditHostAndPorts = auditHostAndPorts;
+        return this;
+    }
+
     public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
         checkNotNull(jdbcOptions, "jdbc options can not be null");
         checkNotNull(dmlOptions, "jdbc dml options can not be null");
@@ -258,7 +264,8 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
                             createBufferReduceExecutor(
                                     dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
                     JdbcBatchingOutputFormat.RecordExtractor.identity(),
-                    inLongMetric);
+                    inLongMetric,
+                    auditHostAndPorts);
         } else {
             // append only query
             final String sql =
@@ -278,7 +285,8 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
                                     sql,
                                     rowDataTypeInformation),
                     JdbcBatchingOutputFormat.RecordExtractor.identity(),
-                    inLongMetric);
+                    inLongMetric,
+                    auditHostAndPorts);
         }
     }
 }
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index f60c3f8c8..328f86a45 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -44,6 +44,8 @@ import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /**
  * Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
@@ -174,12 +176,6 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
                     .defaultValue(false)
                     .withDescription("Whether to support sink update/delete data without primaryKey.");
 
-    public static final ConfigOption<String> INLONG_METRIC =
-            ConfigOptions.key("inlong.metric")
-                    .stringType()
-                    .defaultValue("")
-                    .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
         final FactoryUtil.TableFactoryHelper helper =
@@ -192,14 +188,16 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         TableSchema physicalSchema =
                 TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         boolean appendMode = config.get(SINK_APPEND_MODE);
-        String inLongMetric = config.get(INLONG_METRIC);
+        String inLongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null);
         return new JdbcDynamicTableSink(
                 jdbcOptions,
                 getJdbcExecutionOptions(config),
                 getJdbcDmlOptions(jdbcOptions, physicalSchema),
                 physicalSchema,
                 appendMode,
-                inLongMetric);
+                inLongMetric,
+                auditHostAndPorts);
     }
 
     @Override
@@ -326,6 +324,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         optionalOptions.add(MAX_RETRY_TIMEOUT);
         optionalOptions.add(DIALECT_IMPL);
         optionalOptions.add(INLONG_METRIC);
+        optionalOptions.add(INLONG_AUDIT);
         return optionalOptions;
     }
 
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index 69998c1be..92e54e816 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -51,6 +51,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
     private final String dialectName;
 
     private final String inLongMetric;
+    private final String auditHostAndPorts;
     private final boolean appendMode;
 
     public JdbcDynamicTableSink(
@@ -59,7 +60,8 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
             JdbcDmlOptions dmlOptions,
             TableSchema tableSchema,
             boolean appendMode,
-            String inLongMetric) {
+            String inLongMetric,
+            String auditHostAndPorts) {
         this.jdbcOptions = jdbcOptions;
         this.executionOptions = executionOptions;
         this.dmlOptions = dmlOptions;
@@ -67,6 +69,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
         this.dialectName = dmlOptions.getDialect().dialectName();
         this.appendMode = appendMode;
         this.inLongMetric = inLongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     @Override
@@ -99,6 +102,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
         builder.setRowDataTypeInfo(rowDataTypeInformation);
         builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
         builder.setInLongMetric(inLongMetric);
+        builder.setAuditHostAndPorts(auditHostAndPorts);
         return SinkFunctionProvider.of(
                 new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
     }
@@ -106,7 +110,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
     @Override
     public DynamicTableSink copy() {
         return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions,
-                tableSchema, appendMode, inLongMetric);
+                tableSchema, appendMode, inLongMetric, auditHostAndPorts);
     }
 
     @Override
@@ -127,11 +131,14 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
                 && Objects.equals(executionOptions, that.executionOptions)
                 && Objects.equals(dmlOptions, that.dmlOptions)
                 && Objects.equals(tableSchema, that.tableSchema)
-                && Objects.equals(dialectName, that.dialectName);
+                && Objects.equals(dialectName, that.dialectName)
+                && Objects.equals(inLongMetric, that.inLongMetric)
+                && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName);
+        return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName,
+                inLongMetric, auditHostAndPorts);
     }
 }
\ No newline at end of file