You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 07:05:09 UTC
[inlong] branch master updated: [INLONG-6009][Agent] Report data size to InLong Audit (#6011)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 48f6f152a [INLONG-6009][Agent] Report data size to InLong Audit (#6011)
48f6f152a is described below
commit 48f6f152a9b59cb931c39e6c2d649409d93491eb
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Sep 26 15:05:03 2022 +0800
[INLONG-6009][Agent] Report data size to InLong Audit (#6011)
---
.../inlong/agent/metrics/audit/AuditUtils.java | 15 +---
.../inlong/agent/plugin/sinks/SenderManager.java | 8 +-
.../agent/plugin/sources/reader/BinlogReader.java | 3 +-
.../agent/plugin/sources/reader/KafkaReader.java | 2 +-
.../agent/plugin/sources/reader/MongoDBReader.java | 89 +++++++++++-----------
.../plugin/sources/reader/PostgreSQLReader.java | 3 +-
.../agent/plugin/sources/reader/RedisReader.java | 5 +-
.../plugin/sources/reader/SQLServerReader.java | 12 +--
.../agent/plugin/sources/reader/SqlReader.java | 3 +-
.../sources/reader/file/FileReaderOperator.java | 4 +-
10 files changed, 69 insertions(+), 75 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index 465ced374..d3ad42538 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -72,21 +72,12 @@ public class AuditUtils {
/**
* add audit metric
*/
- public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count) {
+ public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count,
+ long size) {
if (!IS_AUDIT) {
return;
}
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, 0);
- }
-
- /**
- * add
- */
- public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime) {
- if (!IS_AUDIT) {
- return;
- }
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, 0);
+ AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
}
/**
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 3b8d1ee95..dd164847d 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -264,7 +264,9 @@ public class SenderManager {
if (result == SendResult.OK) {
semaphore.release(bodyList.size());
getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
+ long totalSize = bodyList.stream().mapToLong(body -> body.length).sum();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(),
+ totalSize);
} else {
getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
LOGGER.warn("send data to dataproxy error {}", result.toString());
@@ -319,7 +321,9 @@ public class SenderManager {
Map<String, String> dims = new HashMap<>();
dims.put(KEY_INLONG_GROUP_ID, groupId);
dims.put(KEY_INLONG_STREAM_ID, streamId);
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
+ long totalSize = bodyList.stream().mapToLong(body -> body.length).sum();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size(),
+ totalSize);
getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
if (sourcePath != null) {
taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 21188de90..3fab3899c 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -167,8 +167,9 @@ public class BinlogReader extends AbstractReader {
committer.markProcessed(record);
}
committer.markBatchFinished();
+ long dataSize = records.stream().mapToLong(r -> r.value().length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
- System.currentTimeMillis(), records.size());
+ System.currentTimeMillis(), records.size(), dataSize);
readerMetric.pluginReadCount.addAndGet(records.size());
} catch (Exception e) {
readerMetric.pluginReadFailCount.addAndGet(records.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 5c5f0e240..79e5479ee 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -112,7 +112,7 @@ public class KafkaReader<K, V> extends AbstractReader {
byte[] recordValue = (byte[]) record.value();
if (validateMessage(recordValue)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
- inlongGroupId, inlongStreamId, System.currentTimeMillis());
+ inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, recordValue.length);
// header
Map<String, String> headerMap = new HashMap<>();
headerMap.put("record.offset", String.valueOf(record.offset()));
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
index b085e0b0f..c7f0341f5 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -45,68 +45,68 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_TIMEOUT_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_EXCLUDE_LIST;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_RENAMES;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_COPY_THREADS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS;
import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SNAPSHOT_MODE;
-import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED;
+import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY;
import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CAPTURE_MODE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS;
import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_TIMEOUT_MS;
import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CURSOR_MAX_AWAIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST;
import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_RENAMES;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS;
import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_INITIAL_SYNC_MAX_THREADS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD;
import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_POLL_INTERVAL;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER;
/**
* MongoDBReader : mongo source, split mongo source job into multi readers
@@ -254,10 +254,10 @@ public class MongoDBReader extends AbstractReader {
* Handle the completion of the embedded connector engine.
*
* @param success {@code true} if the connector completed normally,
- * or {@code false} if the connector produced an error
- * that prevented startup or premature termination.
+ * or {@code false} if the connector produced an error
+ * that prevented startup or premature termination.
* @param message the completion message; never null
- * @param error the error, or null if there was no exception
+ * @param error the error, or null if there was no exception
*/
private void handle(boolean success, String message, Throwable error) {
//jobConf.getInstanceId()
@@ -318,7 +318,7 @@ public class MongoDBReader extends AbstractReader {
}
private void setEngineConfigIfNecessary(JobProfile jobConf,
- Configuration.Builder builder, String key, Field field) {
+ Configuration.Builder builder, String key, Field field) {
String value = jobConf.get(key, field.defaultValueAsString());
if (StringUtils.isBlank(value)) {
return;
@@ -352,11 +352,11 @@ public class MongoDBReader extends AbstractReader {
* Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)}
* for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished.
*
- * @param records the records to be processed
+ * @param records the records to be processed
* @param committer the committer that indicates to the system that we are finished
*/
private void handleChangeEvent(List<ChangeEvent<String, String>> records,
- DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) {
+ DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) {
try {
for (ChangeEvent<String, String> record : records) {
DebeziumFormat debeziumFormat = JSONPath.read(record.value(), "$.payload", DebeziumFormat.class);
@@ -364,8 +364,9 @@ public class MongoDBReader extends AbstractReader {
committer.markProcessed(record);
}
committer.markBatchFinished();
+ long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, super.inlongGroupId, super.inlongStreamId,
- System.currentTimeMillis(), records.size());
+ System.currentTimeMillis(), records.size(), dataSize);
readerMetric.pluginReadCount.addAndGet(records.size());
} catch (InterruptedException e) {
e.printStackTrace();
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index 5d78c2787..dc7587bc7 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -162,8 +162,9 @@ public class PostgreSQLReader extends AbstractReader {
committer.markProcessed(record);
}
committer.markBatchFinished();
+ long dataSize = records.stream().mapToLong(c -> c.value().length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
- System.currentTimeMillis(), records.size());
+ System.currentTimeMillis(), records.size(), dataSize);
readerMetric.pluginReadCount.addAndGet(records.size());
} catch (Exception e) {
readerMetric.pluginReadFailCount.addAndGet(records.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
index a59f7eaf5..b90dc36b9 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
@@ -108,9 +108,10 @@ public class RedisReader extends AbstractReader {
public void onEvent(Replicator replicator, Event event) {
try {
if (event instanceof DefaultCommand || event instanceof KeyValuePair<?, ?>) {
- redisMessageQueue.put(gson.toJson(event));
+ String eventJson = gson.toJson(event);
+ redisMessageQueue.put(eventJson);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
- System.currentTimeMillis(), 1);
+ System.currentTimeMillis(), 1, eventJson.length());
readerMetric.pluginReadCount.incrementAndGet();
}
if (event instanceof PostRdbSyncEvent) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index 66d023e0f..9edb14ba3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -46,29 +46,22 @@ import static java.sql.Types.VARBINARY;
*/
public class SQLServerReader extends AbstractReader {
- private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
-
public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric";
-
public static final String JOB_DATABASE_USER = "job.sqlserverJob.user";
public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password";
public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname";
public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port";
public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname";
-
public static final String JOB_DATABASE_BATCH_SIZE = "job.sqlserverJob.batchSize";
public static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000;
-
public static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass";
public static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
-
public static final String STD_FIELD_SEPARATOR_SHORT = "\001";
public static final String JOB_DATABASE_SEPARATOR = "job.sql.separator";
-
// pre-set sql lines, commands like "set xxx=xx;"
public static final String JOB_DATABASE_TYPE = "job.database.type";
public static final String SQLSERVER = "sqlserver";
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
private static final String[] NEW_LINE_CHARS = new String[]{String.valueOf(CharUtils.CR),
String.valueOf(CharUtils.LF)};
private static final String[] EMPTY_CHARS = new String[]{StringUtils.EMPTY, StringUtils.EMPTY};
@@ -116,8 +109,9 @@ public class SQLServerReader extends AbstractReader {
}
lineColumns.add(dataValue);
}
+ long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
- System.currentTimeMillis());
+ System.currentTimeMillis(), 1, dataSize);
readerMetric.pluginReadCount.incrementAndGet();
return generateMessage(lineColumns);
} catch (Exception ex) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index 5bf9ac134..b1b0c2280 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -115,8 +115,9 @@ public class SqlReader extends AbstractReader {
}
lineColumns.add(dataValue);
}
+ long dataSize = lineColumns.stream().mapToLong(column -> column.length()).sum();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
- inlongGroupId, inlongStreamId, System.currentTimeMillis());
+ inlongGroupId, inlongStreamId, System.currentTimeMillis(), 1, dataSize);
readerMetric.pluginReadCount.incrementAndGet();
return generateMessage(lineColumns);
} else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 3419ee160..83d6e2a97 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -91,8 +91,8 @@ public class FileReaderOperator extends AbstractReader {
if (iterator != null && iterator.hasNext()) {
String message = iterator.next();
if (validateMessage(message)) {
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
- inlongGroupId, inlongStreamId, System.currentTimeMillis());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+ System.currentTimeMillis(), 1, message.length());
readerMetric.pluginReadCount.incrementAndGet();
String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();