You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 09:19:57 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 86a53e1e2 [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
86a53e1e2 is described below
commit 86a53e1e25f0d1f530d52c8e50a4a8ddcf14d178
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Sep 19 14:09:10 2022 +0800
[INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
---
.../inlong/agent/plugin/sinks/ProxySink.java | 4 ---
.../inlong/agent/plugin/sinks/SenderManager.java | 32 ++++++++++++++--------
.../inlong/common/msg/AttributeConstants.java | 1 -
3 files changed, 20 insertions(+), 17 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index abefbda31..8e1e6940b 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -23,7 +23,6 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.ProxyMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.message.PackProxyMessage;
@@ -104,11 +103,8 @@ public class ProxySink extends AbstractSink {
}
// add message to package proxy
packProxyMessage.addProxyMessage(proxyMessage);
- //
return packProxyMessage;
});
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
- inlongGroupId, inlongStreamId, System.currentTimeMillis());
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 248a932b2..3b8d1ee95 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.metric.MetricRegister;
@@ -226,13 +227,8 @@ public class SenderManager {
}
try {
selectSender(groupId).asyncSendMessage(
- new AgentSenderCallback(jobId, groupId, streamId, bodyList, retry, dataTime),
- bodyList, groupId, streamId,
- dataTime,
- SEQUENTIAL_ID.getNextUuid(),
- maxSenderTimeout,
- TimeUnit.SECONDS
- );
+ new AgentSenderCallback(jobId, groupId, streamId, bodyList, retry, dataTime), bodyList,
+ groupId, streamId, dataTime, SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS);
} catch (Exception exception) {
LOGGER.error("Exception caught", exception);
// retry time
@@ -259,17 +255,28 @@ public class SenderManager {
LOGGER.warn("max retry reached, retry count is {}, sleep and send again", retry);
AgentUtils.silenceSleepInMs(retrySleepTime);
}
+ Map<String, String> dims = new HashMap<>();
+ dims.put(KEY_INLONG_GROUP_ID, groupId);
+ dims.put(KEY_INLONG_STREAM_ID, streamId);
try {
- selectSender(groupId).sendMessage(
- bodyList, groupId, streamId, dataTime, "",
- maxSenderTimeout, TimeUnit.SECONDS, extraMap
- );
- semaphore.release(bodyList.size());
+ SendResult result = selectSender(groupId).sendMessage(bodyList, groupId, streamId, dataTime, "",
+ maxSenderTimeout, TimeUnit.SECONDS, extraMap);
+ if (result == SendResult.OK) {
+ semaphore.release(bodyList.size());
+ getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
+ } else {
+ getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
+ LOGGER.warn("send data to dataproxy error {}", result.toString());
+ sendBatchSync(groupId, streamId, bodyList, retry + 1, dataTime, extraMap);
+ }
+
} catch (Exception exception) {
LOGGER.error("Exception caught", exception);
// retry time
try {
TimeUnit.SECONDS.sleep(1);
+ getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
sendBatchSync(groupId, streamId, bodyList, retry + 1, dataTime, extraMap);
} catch (Exception ignored) {
// ignore it.
@@ -312,6 +319,7 @@ public class SenderManager {
Map<String, String> dims = new HashMap<>();
dims.put(KEY_INLONG_GROUP_ID, groupId);
dims.put(KEY_INLONG_STREAM_ID, streamId);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
if (sourcePath != null) {
taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 4d4dcb4b8..684520305 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -44,7 +44,6 @@ public interface AttributeConstants {
* data time
*/
String DATA_TIME = "dt";
-
String TIME_STAMP = "t";
/* compress type */