You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/21 06:16:55 UTC
[incubator-inlong] branch master updated: [INLONG-3250] Fix duration error of DataProxy metric (#3251)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f0993ea [INLONG-3250] Fix duration error of DataProxy metric (#3251)
f0993ea is described below
commit f0993eaf358fafa554e0d246d8313975b742a340
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Mar 21 14:16:49 2022 +0800
[INLONG-3250] Fix duration error of DataProxy metric (#3251)
---
.../dataproxy/sink/kafkazone/KafkaZoneSinkContext.java | 14 +++++++-------
.../dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java | 14 +++++++-------
.../dataproxy/sink/tubezone/TubeZoneSinkContext.java | 14 +++++++-------
3 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
index a6fc4a4..99ee3d6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/kafkazone/KafkaZoneSinkContext.java
@@ -231,8 +231,8 @@ public class KafkaZoneSinkContext extends SinkContext {
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
- long msgTime = currentRecord.getDispatchTime();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long dispatchTime = currentRecord.getDispatchTime();
+ long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
long count = currentRecord.getCount();
@@ -247,11 +247,11 @@ public class KafkaZoneSinkContext extends SinkContext {
long currentTime = System.currentTimeMillis();
currentRecord.getEvents().forEach((event) -> {
long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - event.getMsgTime();
- long wholeDuration = currentTime - msgTime;
- metricItem.sinkDuration.addAndGet(sinkDuration * count);
- metricItem.nodeDuration.addAndGet(nodeDuration * count);
- metricItem.wholeDuration.addAndGet(wholeDuration * count);
+ long nodeDuration = currentTime - event.getSourceTime();
+ long wholeDuration = currentTime - event.getMsgTime();
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
});
}
} else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
index 1a1182b..e9ea542 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.java
@@ -231,8 +231,8 @@ public class PulsarZoneSinkContext extends SinkContext {
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
- long msgTime = currentRecord.getDispatchTime();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long dispatchTime = currentRecord.getDispatchTime();
+ long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
long count = currentRecord.getCount();
@@ -247,11 +247,11 @@ public class PulsarZoneSinkContext extends SinkContext {
long currentTime = System.currentTimeMillis();
currentRecord.getEvents().forEach((event) -> {
long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - event.getMsgTime();
- long wholeDuration = currentTime - msgTime;
- metricItem.sinkDuration.addAndGet(sinkDuration * count);
- metricItem.nodeDuration.addAndGet(nodeDuration * count);
- metricItem.wholeDuration.addAndGet(wholeDuration * count);
+ long nodeDuration = currentTime - event.getSourceTime();
+ long wholeDuration = currentTime - event.getMsgTime();
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
});
}
} else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
index ceaf2fc..e26b377 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/tubezone/TubeZoneSinkContext.java
@@ -231,8 +231,8 @@ public class TubeZoneSinkContext extends SinkContext {
fillInlongId(currentRecord, dimensions);
dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getSinkName());
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, bid);
- long msgTime = currentRecord.getDispatchTime();
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ long dispatchTime = currentRecord.getDispatchTime();
+ long auditFormatTime = dispatchTime - dispatchTime % CommonPropertiesHolder.getAuditFormatInterval();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
DataProxyMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
long count = currentRecord.getCount();
@@ -247,11 +247,11 @@ public class TubeZoneSinkContext extends SinkContext {
long currentTime = System.currentTimeMillis();
currentRecord.getEvents().forEach((event) -> {
long sinkDuration = currentTime - sendTime;
- long nodeDuration = currentTime - event.getMsgTime();
- long wholeDuration = currentTime - msgTime;
- metricItem.sinkDuration.addAndGet(sinkDuration * count);
- metricItem.nodeDuration.addAndGet(nodeDuration * count);
- metricItem.wholeDuration.addAndGet(wholeDuration * count);
+ long nodeDuration = currentTime - event.getSourceTime();
+ long wholeDuration = currentTime - event.getMsgTime();
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
});
}
} else {