You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/11/12 06:57:44 UTC
(inlong) branch master updated: [INLONG-9265][Agent] Add audit of agent send success (#9266)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e51e9edfc9 [INLONG-9265][Agent] Add audit of agent send success (#9266)
e51e9edfc9 is described below
commit e51e9edfc913c6c6bc55df111531a1c8254b82ff
Author: justinwwhuang <hw...@163.com>
AuthorDate: Sun Nov 12 14:57:39 2023 +0800
[INLONG-9265][Agent] Add audit of agent send success (#9266)
---
.../apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index abb21fe8ba..d13ef19963 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -28,6 +28,7 @@ import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -47,7 +48,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -122,7 +122,6 @@ public class SenderManager {
private List<PackageAckInfo> packageAckInfoList = new ArrayList<>();
private final ReentrantReadWriteLock packageAckInfoLock = new ReentrantReadWriteLock(true);
protected InstanceProfile profile;
- private Random testRandom = new Random();
private volatile boolean offsetRunning = false;
private volatile boolean resendRunning = false;
private volatile boolean started = false;
@@ -175,7 +174,6 @@ public class SenderManager {
this.metricItemSet = new AgentMetricItemSet(metricName);
MetricRegister.register(metricItemSet);
resendQueue = new LinkedBlockingQueue<>();
-
}
public void Start() throws Exception {
@@ -429,6 +427,8 @@ public class SenderManager {
if (result != null && result.equals(SendResult.OK)) {
message.getAckInfo().setHasAck(true);
getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId,
+ System.currentTimeMillis(), message.getMsgCnt(), message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result);