You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/11 03:50:21 UTC

[inlong] branch master updated: [INLONG-5164][Agent] BinlogReader adds audit and metrics (#5467)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fad7c8ac4 [INLONG-5164][Agent] BinlogReader adds audit and metrics (#5467)
fad7c8ac4 is described below

commit fad7c8ac40974cb4d33d6a9c394e5f01efad0a6d
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Aug 11 11:50:17 2022 +0800

    [INLONG-5164][Agent] BinlogReader adds audit and metrics (#5467)
---
 .../org/apache/inlong/agent/metrics/audit/AuditUtils.java    | 12 +++++++++++-
 .../inlong/agent/plugin/sources/reader/BinlogReader.java     |  7 +++++--
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index 9198ade54..465ced374 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -25,8 +25,8 @@ import org.apache.inlong.audit.util.AuditConfig;
 import java.util.HashSet;
 
 import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
-import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
 import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_KEY_PROXYS;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
 import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_PROXYS;
 
 /**
@@ -69,6 +69,16 @@ public class AuditUtils {
         }
     }
 
+    /**
+     * add audit metric
+     */
+    public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count) {
+        if (!IS_AUDIT) {
+            return;
+        }
+        AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, 0);
+    }
+
     /**
      * add
      */
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index efd6ba926..4081b32fc 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -29,6 +29,7 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.SnapshotModeConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
 import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory;
@@ -58,7 +59,6 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
  */
 public class BinlogReader extends AbstractReader {
 
-    public static final String COMPONENT_NAME = "BinlogReader";
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
     public static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
     public static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
@@ -77,7 +77,6 @@ public class BinlogReader extends AbstractReader {
     public static final String JOB_DATABASE_QUEUE_SIZE = "job.binlogJob.queueSize";
     private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
     private static final Gson GSON = new Gson();
-    private static final String BINLOG_READER_TAG_NAME = "AgentBinlogMetric";
     private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
     /**
      * pair.left: table name
@@ -167,7 +166,11 @@ public class BinlogReader extends AbstractReader {
                             committer.markProcessed(record);
                         }
                         committer.markBatchFinished();
+                        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                                System.currentTimeMillis(), records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
                     } catch (Exception e) {
+                        readerMetric.pluginReadFailCount.addAndGet(records.size());
                         LOGGER.error("parse binlog message error", e);
                     }
                 })