You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by wo...@apache.org on 2023/01/05 15:36:28 UTC

[inlong] branch master updated: [INLONG-7166][DataProxy] Fix audit data reporting (#7167)

This is an automated email from the ASF dual-hosted git repository.

woofyzhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8db88eccc [INLONG-7166][DataProxy] Fix audit data reporting (#7167)
8db88eccc is described below

commit 8db88eccce43f549c5db265002c791be38f2d619
Author: woofyzhao <zh...@gmail.com>
AuthorDate: Thu Jan 5 23:36:22 2023 +0800

    [INLONG-7166][DataProxy] Fix audit data reporting (#7167)
    
    * [INLONG-7166][DataProxy] Fix audit data reporting
---
 .../inlong/dataproxy/config/holder/CommonPropertiesHolder.java      | 6 ++++--
 .../java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java  | 4 ++--
 .../java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java  | 4 ++--
 .../inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java       | 6 ++++++
 .../java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java  | 4 +---
 .../apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java    | 4 ++--
 .../org/apache/inlong/sort/standalone/dispatch/DispatchManager.java | 4 ++--
 7 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 90a5be47c..1d4fe3990 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -88,8 +88,10 @@ public class CommonPropertiesHolder {
      * @return the props
      */
     public static Map<String, String> get() {
-        if (props != null) {
-            return props;
+        synchronized (KEY_COMMON_PROPERTIES) {
+            if (props != null) {
+                return props;
+            }
         }
         init();
         return props;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
index e03ed01b4..a1da9f076 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
@@ -164,7 +164,7 @@ public class DispatchManager {
         if (!needOutputOvertimeData.getAndSet(false)) {
             return;
         }
-        LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
+        LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
                 profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum());
         long currentTime = System.currentTimeMillis();
         long createThreshold = currentTime - dispatchTimeout;
@@ -187,7 +187,7 @@ public class DispatchManager {
                 outCounter.addAndGet(dispatchProfile.getCount());
             }
         });
-        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+        LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
                 + "inCounter:{},outCounter:{}",
                 profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum(), eventCount,
                 inCounter.getAndSet(0), outCounter.getAndSet(0));
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index 4c1edf190..d5fae2e04 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -156,7 +156,7 @@ public class BatchPackManager {
         if (!needOutputOvertimeData.getAndSet(false)) {
             return;
         }
-        LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
+        LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
                 profileCache.size(), dispatchQueue.size());
         long currentTime = System.currentTimeMillis();
         long createThreshold = currentTime - dispatchTimeout;
@@ -179,7 +179,7 @@ public class BatchPackManager {
                 outCounter.addAndGet(dispatchProfile.getCount());
             }
         });
-        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+        LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
                 + "inCounter:{},outCounter:{}",
                 profileCache.size(), dispatchQueue.size(), eventCount,
                 inCounter.getAndSet(0), outCounter.getAndSet(0));
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 2af652de7..cb62fea41 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -164,6 +164,12 @@ public class MessageQueueZoneSinkContext extends SinkContext {
      * addSendResultMetric
      */
     public void addSendResultMetric(BatchPackProfile currentRecord, String topic, boolean result, long sendTime) {
+        if (currentRecord instanceof SimpleBatchPackProfileV0) {
+            AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS,
+                    ((SimpleBatchPackProfileV0) currentRecord).getSimpleProfile());
+            return;
+        }
+
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.getProxyClusterId());
         dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, "-");
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 8d52abcaf..8907b13a5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -287,9 +287,7 @@ public class TubeHandler implements MessageQueueHandler {
         // sendAsync
         Message message = new Message(topic, bodyBytes);
         // add headers
-        headers.forEach((key, value) -> {
-            message.setAttrKeyVal(key, value);
-        });
+        headers.forEach(message::setAttrKeyVal);
         // callback
         long sendTime = System.currentTimeMillis();
         MessageSentCallback callback = new MessageSentCallback() {
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java
index 945a4f1dc..9ae6303bb 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java
@@ -104,7 +104,7 @@ public class DispatchManager {
      * @return
      */
     public void outputOvertimeData() {
-        LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
+        LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
                 profileCache.size(), dispatchQueue.size());
         long currentTime = System.currentTimeMillis();
         long createThreshold = currentTime - dispatchTimeout;
@@ -122,7 +122,7 @@ public class DispatchManager {
         removeKeys.forEach((key) -> {
             dispatchQueue.offer(this.profileCache.remove(key));
         });
-        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}",
+        LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}",
                 profileCache.size(), dispatchQueue.size(), eventCount);
     }
 
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
index cbea8d2f5..e61c05ade 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
@@ -115,7 +115,7 @@ public class DispatchManager {
         if (!needOutputOvertimeData.getAndSet(false)) {
             return;
         }
-        LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
+        LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
                 profileCache.size(), dispatchQueue.size());
         long currentTime = System.currentTimeMillis();
         long createThreshold = currentTime - dispatchTimeout;
@@ -139,7 +139,7 @@ public class DispatchManager {
                 outCounter.addAndGet(dispatchProfile.getCount());
             }
         });
-        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+        LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
                 + "inCounter:{},outCounter:{}",
                 profileCache.size(), dispatchQueue.size(), eventCount,
                 inCounter.getAndSet(0), outCounter.getAndSet(0));