You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 09:19:57 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 86a53e1e2 [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
86a53e1e2 is described below

commit 86a53e1e25f0d1f530d52c8e50a4a8ddcf14d178
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Sep 19 14:09:10 2022 +0800

    [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
---
 .../inlong/agent/plugin/sinks/ProxySink.java       |  4 ---
 .../inlong/agent/plugin/sinks/SenderManager.java   | 32 ++++++++++++++--------
 .../inlong/common/msg/AttributeConstants.java      |  1 -
 3 files changed, 20 insertions(+), 17 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index abefbda31..8e1e6940b 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -23,7 +23,6 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.message.ProxyMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.message.PackProxyMessage;
@@ -104,11 +103,8 @@ public class ProxySink extends AbstractSink {
                                 }
                                 // add message to package proxy
                                 packProxyMessage.addProxyMessage(proxyMessage);
-                                //
                                 return packProxyMessage;
                             });
-                    AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
-                            inlongGroupId, inlongStreamId, System.currentTimeMillis());
                     // increment the count of successful sinks
                     sinkMetric.sinkSuccessCount.incrementAndGet();
                 } else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 248a932b2..3b8d1ee95 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.core.task.TaskPositionManager;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.message.SequentialID;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.metric.MetricRegister;
@@ -226,13 +227,8 @@ public class SenderManager {
         }
         try {
             selectSender(groupId).asyncSendMessage(
-                    new AgentSenderCallback(jobId, groupId, streamId, bodyList, retry, dataTime),
-                    bodyList, groupId, streamId,
-                    dataTime,
-                    SEQUENTIAL_ID.getNextUuid(),
-                    maxSenderTimeout,
-                    TimeUnit.SECONDS
-            );
+                    new AgentSenderCallback(jobId, groupId, streamId, bodyList, retry, dataTime), bodyList,
+                    groupId, streamId, dataTime, SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS);
         } catch (Exception exception) {
             LOGGER.error("Exception caught", exception);
             // retry time
@@ -259,17 +255,28 @@ public class SenderManager {
             LOGGER.warn("max retry reached, retry count is {}, sleep and send again", retry);
             AgentUtils.silenceSleepInMs(retrySleepTime);
         }
+        Map<String, String> dims = new HashMap<>();
+        dims.put(KEY_INLONG_GROUP_ID, groupId);
+        dims.put(KEY_INLONG_STREAM_ID, streamId);
         try {
-            selectSender(groupId).sendMessage(
-                    bodyList, groupId, streamId, dataTime, "",
-                    maxSenderTimeout, TimeUnit.SECONDS, extraMap
-            );
-            semaphore.release(bodyList.size());
+            SendResult result = selectSender(groupId).sendMessage(bodyList, groupId, streamId, dataTime, "",
+                    maxSenderTimeout, TimeUnit.SECONDS, extraMap);
+            if (result == SendResult.OK) {
+                semaphore.release(bodyList.size());
+                getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
+            } else {
+                getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
+                LOGGER.warn("send data to dataproxy error {}", result.toString());
+                sendBatchSync(groupId, streamId, bodyList, retry + 1, dataTime, extraMap);
+            }
+
         } catch (Exception exception) {
             LOGGER.error("Exception caught", exception);
             // retry time
             try {
                 TimeUnit.SECONDS.sleep(1);
+                getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
                 sendBatchSync(groupId, streamId, bodyList, retry + 1, dataTime, extraMap);
             } catch (Exception ignored) {
                 // ignore it.
@@ -312,6 +319,7 @@ public class SenderManager {
             Map<String, String> dims = new HashMap<>();
             dims.put(KEY_INLONG_GROUP_ID, groupId);
             dims.put(KEY_INLONG_STREAM_ID, streamId);
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, bodyList.size());
             getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
             if (sourcePath != null) {
                 taskPositionManager.updateSinkPosition(jobId, sourcePath, bodyList.size());
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 4d4dcb4b8..684520305 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -44,7 +44,6 @@ public interface AttributeConstants {
      * data time
      */
     String DATA_TIME = "dt";
-
     String TIME_STAMP = "t";
 
     /* compress type */