You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/11/12 06:57:44 UTC

(inlong) branch master updated: [INLONG-9265][Agent] Add audit of agent send success (#9266)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e51e9edfc9 [INLONG-9265][Agent] Add audit of agent send success (#9266)
e51e9edfc9 is described below

commit e51e9edfc913c6c6bc55df111531a1c8254b82ff
Author: justinwwhuang <hw...@163.com>
AuthorDate: Sun Nov 12 14:57:39 2023 +0800

    [INLONG-9265][Agent] Add audit of agent send success (#9266)
---
 .../apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index abb21fe8ba..d13ef19963 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -28,6 +28,7 @@ import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
 import org.apache.inlong.agent.message.filecollect.SenderMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.message.SequentialID;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
@@ -47,7 +48,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
@@ -122,7 +122,6 @@ public class SenderManager {
     private List<PackageAckInfo> packageAckInfoList = new ArrayList<>();
     private final ReentrantReadWriteLock packageAckInfoLock = new ReentrantReadWriteLock(true);
     protected InstanceProfile profile;
-    private Random testRandom = new Random();
     private volatile boolean offsetRunning = false;
     private volatile boolean resendRunning = false;
     private volatile boolean started = false;
@@ -175,7 +174,6 @@ public class SenderManager {
         this.metricItemSet = new AgentMetricItemSet(metricName);
         MetricRegister.register(metricItemSet);
         resendQueue = new LinkedBlockingQueue<>();
-
     }
 
     public void Start() throws Exception {
@@ -429,6 +427,8 @@ public class SenderManager {
             if (result != null && result.equals(SendResult.OK)) {
                 message.getAckInfo().setHasAck(true);
                 getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt);
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId,
+                        System.currentTimeMillis(), message.getMsgCnt(), message.getTotalSize());
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, "
                         + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result);