You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 07:05:09 UTC

[inlong] branch master updated: [INLONG-6009][Agent] Report data size to InLong Audit (#6011)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 48f6f152a [INLONG-6009][Agent] Report data size to InLong Audit (#6011)
48f6f152a is described below

commit 48f6f152a9b59cb931c39e6c2d649409d93491eb
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Sep 26 15:05:03 2022 +0800

    [INLONG-6009][Agent] Report data size to InLong Audit (#6011)
---
 .../inlong/agent/metrics/audit/AuditUtils.java     | 15 +---
 .../inlong/agent/plugin/sinks/SenderManager.java   |  8 +-
 .../agent/plugin/sources/reader/BinlogReader.java  |  3 +-
 .../agent/plugin/sources/reader/KafkaReader.java   |  2 +-
 .../agent/plugin/sources/reader/MongoDBReader.java | 89 +++++++++++-----------
 .../plugin/sources/reader/PostgreSQLReader.java    |  3 +-
 .../agent/plugin/sources/reader/RedisReader.java   |  5 +-
 .../plugin/sources/reader/SQLServerReader.java     | 12 +--
 .../agent/plugin/sources/reader/SqlReader.java     |  3 +-
 .../sources/reader/file/FileReaderOperator.java    |  4 +-
 10 files changed, 69 insertions(+), 75 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index 465ced374..d3ad42538 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -72,21 +72,12 @@ public class AuditUtils {
     /**
      * add audit metric
      */
-    public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count) {
+    public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count,
+            long size) {
         if (!IS_AUDIT) {
             return;
         }
-        AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, 0);
-    }
-
-    /**
-     * add
-     */
-    public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime) {
-        if (!IS_AUDIT) {
-            return;
-        }
-        AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, 0);
+        AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
     }
 
     /**
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 3b8d1ee95..dd164847d 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -264,7 +264,9 @@ public class SenderManager {
             if (result == SendResult.OK) {
                 semaphore.release(bodyList.size());
                 getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
-                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
+                long totalSize = bodyList.stream().mapToLong(body -> body.length).sum();
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(),
+                        totalSize);
             } else {
                 getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
                 LOGGER.warn("send data to dataproxy error {}", result.toString());
@@ -319,7 +321,9 @@ public class SenderManager {
             Map<String, String> dims = new HashMap<>();
             dims.put(KEY_INLONG_GROUP_ID, groupId);
             dims.put(KEY_INLONG_STREAM_ID, streamId);
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
+            long totalSize = bodyList.stream().mapToLong(body -> body.length).sum();
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(),
+                    totalSize);
             getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
             if (sourcePath != null) {
                 taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 21188de90..3fab3899c 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -167,8 +167,9 @@ public class BinlogReader extends AbstractReader {
                             committer.markProcessed(record);
                         }
                         committer.markBatchFinished();
+                        long dataSize = records.stream().mapToLong(r -> r.value().length()).sum();
                         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                                System.currentTimeMillis(), records.size());
+                                System.currentTimeMillis(), records.size(), dataSize);
                         readerMetric.pluginReadCount.addAndGet(records.size());
                     } catch (Exception e) {
                         readerMetric.pluginReadFailCount.addAndGet(records.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 5c5f0e240..79e5479ee 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -112,7 +112,7 @@ public class KafkaReader<K, V> extends AbstractReader {
             byte[] recordValue = (byte[]) record.value();
             if (validateMessage(recordValue)) {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
-                        inlongGroupId, inlongStreamId, System.currentTimeMillis());
+                        inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, recordValue.length);
                 // header
                 Map<String, String> headerMap = new HashMap<>();
                 headerMap.put("record.offset", String.valueOf(record.offset()));
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
index b085e0b0f..c7f0341f5 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -45,68 +45,68 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_TIMEOUT_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_EXCLUDE_LIST;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_RENAMES;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_COPY_THREADS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS;
 import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SNAPSHOT_MODE;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER;
 import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CAPTURE_MODE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_TIMEOUT_MS;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CURSOR_MAX_AWAIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_RENAMES;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_INITIAL_SYNC_MAX_THREADS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_POLL_INTERVAL;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER;
 
 /**
  * MongoDBReader : mongo source, split mongo source job into multi readers
@@ -254,10 +254,10 @@ public class MongoDBReader extends AbstractReader {
      * Handle the completion of the embedded connector engine.
      *
      * @param success {@code true} if the connector completed normally,
-     *                or {@code false} if the connector produced an error
-     *                that prevented startup or premature termination.
+     *         or {@code false} if the connector produced an error
+     *         that prevented startup or premature termination.
      * @param message the completion message; never null
-     * @param error   the error, or null if there was no exception
+     * @param error the error, or null if there was no exception
      */
     private void handle(boolean success, String message, Throwable error) {
         //jobConf.getInstanceId()
@@ -318,7 +318,7 @@ public class MongoDBReader extends AbstractReader {
     }
 
     private void setEngineConfigIfNecessary(JobProfile jobConf,
-                                            Configuration.Builder builder, String key, Field field) {
+            Configuration.Builder builder, String key, Field field) {
         String value = jobConf.get(key, field.defaultValueAsString());
         if (StringUtils.isBlank(value)) {
             return;
@@ -352,11 +352,11 @@ public class MongoDBReader extends AbstractReader {
      * Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)}
      * for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished.
      *
-     * @param records   the records to be processed
+     * @param records the records to be processed
      * @param committer the committer that indicates to the system that we are finished
      */
     private void handleChangeEvent(List<ChangeEvent<String, String>> records,
-                                   DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) {
+            DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) {
         try {
             for (ChangeEvent<String, String> record : records) {
                 DebeziumFormat debeziumFormat = JSONPath.read(record.value(), "$.payload", DebeziumFormat.class);
@@ -364,8 +364,9 @@ public class MongoDBReader extends AbstractReader {
                 committer.markProcessed(record);
             }
             committer.markBatchFinished();
+            long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, super.inlongGroupId, super.inlongStreamId,
-                    System.currentTimeMillis(), records.size());
+                    System.currentTimeMillis(), records.size(), dataSize);
             readerMetric.pluginReadCount.addAndGet(records.size());
         } catch (InterruptedException e) {
             e.printStackTrace();
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index 5d78c2787..dc7587bc7 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -162,8 +162,9 @@ public class PostgreSQLReader extends AbstractReader {
                             committer.markProcessed(record);
                         }
                         committer.markBatchFinished();
+                        long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
                         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                                System.currentTimeMillis(), records.size());
+                                System.currentTimeMillis(), records.size(), dataSize);
                         readerMetric.pluginReadCount.addAndGet(records.size());
                     } catch (Exception e) {
                         readerMetric.pluginReadFailCount.addAndGet(records.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
index a59f7eaf5..b90dc36b9 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
@@ -108,9 +108,10 @@ public class RedisReader extends AbstractReader {
                 public void onEvent(Replicator replicator, Event event) {
                     try {
                         if (event instanceof DefaultCommand || event instanceof KeyValuePair<?, ?>) {
-                            redisMessageQueue.put(gson.toJson(event));
+                            String eventJson = gson.toJson(event);
+                            redisMessageQueue.put(eventJson);
                             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                                    System.currentTimeMillis(), 1);
+                                    System.currentTimeMillis(), 1, eventJson.length());
                             readerMetric.pluginReadCount.incrementAndGet();
                         }
                         if (event instanceof PostRdbSyncEvent) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index 66d023e0f..9edb14ba3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -46,29 +46,22 @@ import static java.sql.Types.VARBINARY;
  */
 public class SQLServerReader extends AbstractReader {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
-
     public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
-
     public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
     public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
     public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
     public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
     public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
-
     public static final String JOB_DATABASE_BATCH_SIZE = "job.sqlserverJob.batchSize";
     public static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000;
-
     public static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass";
     public static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
-
     public static final String STD_FIELD_SEPARATOR_SHORT = "\001";
     public static final String JOB_DATABASE_SEPARATOR = "job.sql.separator";
-
     // pre-set sql lines, commands like "set xxx=xx;"
     public static final String JOB_DATABASE_TYPE = "job.database.type";
     public static final String SQLSERVER = "sqlserver";
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
     private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR),
             String.valueOf(CharUtils.LF)};
     private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY};
@@ -116,8 +109,9 @@ public class SQLServerReader extends AbstractReader {
                 }
                 lineColumns.add(dataValue);
             }
+            long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum();
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
-                    System.currentTimeMillis());
+                    System.currentTimeMillis(), 1, dataSize);
             readerMetric.pluginReadCount.incrementAndGet();
             return generateMessage(lineColumns);
         } catch (Exception ex) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index 5bf9ac134..b1b0c2280 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -115,8 +115,9 @@ public class SqlReader extends AbstractReader {
                     }
                     lineColumns.add(dataValue);
                 }
+                long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum();
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
-                        inlongGroupId, inlongStreamId, System.currentTimeMillis());
+                        inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, dataSize);
                 readerMetric.pluginReadCount.incrementAndGet();
                 return generateMessage(lineColumns);
             } else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 3419ee160..83d6e2a97 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -91,8 +91,8 @@ public class FileReaderOperator extends AbstractReader {
         if (iterator != null && iterator.hasNext()) {
             String message = iterator.next();
             if (validateMessage(message)) {
-                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
-                        inlongGroupId, inlongStreamId, System.currentTimeMillis());
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                        System.currentTimeMillis(), 1, message.length());
                 readerMetric.pluginReadCount.incrementAndGet();
                 String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
                 Map<String, String> header = new HashMap<>();