You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/21 06:16:55 UTC

[incubator-inlong] branch master updated: [INLONG-3250] Fix duration error of DataProxy metric (#3251)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f0993ea  [INLONG-3250] Fix duration error of DataProxy metric (#3251)
f0993ea is described below

commit f0993eaf358fafa554e0d246d8313975b742a340
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Mar 21 14:16:49 2022 +0800

    [INLONG-3250] Fix duration error of DataProxy metric (#3251)
---
 .../dataproxy/sink/kafkazone/KafkaZoneSinkContext.java     | 14 +++++++-------
 .../dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java   | 14 +++++++-------
 .../dataproxy/sink/tubezone/TubeZoneSinkContext.java       | 14 +++++++-------
 3 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
index a6fc4a4..99ee3d6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
@@ -231,8 +231,8 @@ public class KafkaZoneSinkContext extends SinkContext {
         fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
-        long msgTime = currentRecord.getDispatchTime();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        long dispatchTime = currentRecord.getDispatchTime();
+        long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
         long count = currentRecord.getCount();
@@ -247,11 +247,11 @@ public class KafkaZoneSinkContext extends SinkContext {
                 long currentTime = System.currentTimeMillis();
                 currentRecord.getEvents().forEach((event) -> {
                     long sinkDuration = currentTime - sendTime;
-                    long nodeDuration = currentTime - event.getMsgTime();
-                    long wholeDuration = currentTime - msgTime;
-                    metricItem.sinkDuration.addAndGet(sinkDuration * count);
-                    metricItem.nodeDuration.addAndGet(nodeDuration * count);
-                    metricItem.wholeDuration.addAndGet(wholeDuration * count);
+                    long nodeDuration = currentTime - event.getSourceTime();
+                    long wholeDuration = currentTime - event.getMsgTime();
+                    metricItem.sinkDuration.addAndGet(sinkDuration);
+                    metricItem.nodeDuration.addAndGet(nodeDuration);
+                    metricItem.wholeDuration.addAndGet(wholeDuration);
                 });
             }
         } else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
index 1a1182b..e9ea542 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -231,8 +231,8 @@ public class PulsarZoneSinkContext extends SinkContext {
         fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
-        long msgTime = currentRecord.getDispatchTime();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        long dispatchTime = currentRecord.getDispatchTime();
+        long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
         long count = currentRecord.getCount();
@@ -247,11 +247,11 @@ public class PulsarZoneSinkContext extends SinkContext {
                 long currentTime = System.currentTimeMillis();
                 currentRecord.getEvents().forEach((event) -> {
                     long sinkDuration = currentTime - sendTime;
-                    long nodeDuration = currentTime - event.getMsgTime();
-                    long wholeDuration = currentTime - msgTime;
-                    metricItem.sinkDuration.addAndGet(sinkDuration * count);
-                    metricItem.nodeDuration.addAndGet(nodeDuration * count);
-                    metricItem.wholeDuration.addAndGet(wholeDuration * count);
+                    long nodeDuration = currentTime - event.getSourceTime();
+                    long wholeDuration = currentTime - event.getMsgTime();
+                    metricItem.sinkDuration.addAndGet(sinkDuration);
+                    metricItem.nodeDuration.addAndGet(nodeDuration);
+                    metricItem.wholeDuration.addAndGet(wholeDuration);
                 });
             }
         } else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
index ceaf2fc..e26b377 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
@@ -231,8 +231,8 @@ public class TubeZoneSinkContext extends SinkContext {
         fillInlongId(currentRecord, dimensions);
         dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
-        long msgTime = currentRecord.getDispatchTime();
-        long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+        long dispatchTime = currentRecord.getDispatchTime();
+        long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
         long count = currentRecord.getCount();
@@ -247,11 +247,11 @@ public class TubeZoneSinkContext extends SinkContext {
                 long currentTime = System.currentTimeMillis();
                 currentRecord.getEvents().forEach((event) -> {
                     long sinkDuration = currentTime - sendTime;
-                    long nodeDuration = currentTime - event.getMsgTime();
-                    long wholeDuration = currentTime - msgTime;
-                    metricItem.sinkDuration.addAndGet(sinkDuration * count);
-                    metricItem.nodeDuration.addAndGet(nodeDuration * count);
-                    metricItem.wholeDuration.addAndGet(wholeDuration * count);
+                    long nodeDuration = currentTime - event.getSourceTime();
+                    long wholeDuration = currentTime - event.getMsgTime();
+                    metricItem.sinkDuration.addAndGet(sinkDuration);
+                    metricItem.nodeDuration.addAndGet(nodeDuration);
+                    metricItem.wholeDuration.addAndGet(wholeDuration);
                 });
             }
         } else {