You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/11 03:50:21 UTC
[inlong] branch master updated: [INLONG-5164][Agent] BinlogReader adds audit and metrics (#5467)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fad7c8ac4 [INLONG-5164][Agent] BinlogReader adds audit and metrics (#5467)
fad7c8ac4 is described below
commit fad7c8ac40974cb4d33d6a9c394e5f01efad0a6d
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Aug 11 11:50:17 2022 +0800
[INLONG-5164][Agent] BinlogReader adds audit and metrics (#5467)
---
.../org/apache/inlong/agent/metrics/audit/AuditUtils.java | 12 +++++++++++-
.../inlong/agent/plugin/sources/reader/BinlogReader.java | 7 +++++--
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index 9198ade54..465ced374 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -25,8 +25,8 @@ import org.apache.inlong.audit.util.AuditConfig;
import java.util.HashSet;
import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
-import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_KEY_PROXYS;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_PROXYS;
/**
@@ -69,6 +69,16 @@ public class AuditUtils {
}
}
+ /**
+ * add audit metric
+ */
+ public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count) {
+ if (!IS_AUDIT) {
+ return;
+ }
+ AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, 0);
+ }
+
/**
* add
*/
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index efd6ba926..4081b32fc 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -29,6 +29,7 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.SnapshotModeConstants;
import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
@@ -58,7 +59,6 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
*/
public class BinlogReader extends AbstractReader {
- public static final String COMPONENT_NAME = "BinlogReader";
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
public static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
public static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
@@ -77,7 +77,6 @@ public class BinlogReader extends AbstractReader {
public static final String JOB_DATABASE_QUEUE_SIZE = "job.binlogJob.queueSize";
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
private static final Gson GSON = new Gson();
- private static final String BINLOG_READER_TAG_NAME = "AgentBinlogMetric";
private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
/**
* pair.left: table name
@@ -167,7 +166,11 @@ public class BinlogReader extends AbstractReader {
committer.markProcessed(record);
}
committer.markBatchFinished();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+ System.currentTimeMillis(), records.size());
+ readerMetric.pluginReadCount.addAndGet(records.size());
} catch (Exception e) {
+ readerMetric.pluginReadFailCount.addAndGet(records.size());
LOGGER.error("parse binlog message error", e);
}
})