You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/25 03:22:36 UTC
[inlong] branch master updated: [INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c041a5ab6 [INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013)
c041a5ab6 is described below
commit c041a5ab6baea8993909f3c30d93a3a6fc62929e
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Sep 25 11:22:30 2022 +0800
[INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013)
---
.../dataproxy/metrics/DataProxyMetricItemSet.java | 29 ++++++++--------------
1 file changed, 10 insertions(+), 19 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index cb52823bb..1256c328a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -80,17 +80,6 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
fillMetricItemsByEvent(event, true, true, isSuccess, size, 0);
}
- /**
- * Fill sink metric items by event
- *
- * @param event the event object
- * @param isSuccess whether success read or send
- * @param size the message size
- */
- public void fillSinkReadMetricItemsByEvent(Event event, boolean isSuccess, long size) {
- fillMetricItemsByEvent(event, false, true, isSuccess, size, 0);
- }
-
/**
* Fill sink send metric items by event
*
@@ -124,6 +113,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
event.getHeaders().get(AttributeConstants.STREAM_ID));
long dataTime = NumberUtils.toLong(
event.getHeaders().get(AttributeConstants.DATA_TIME));
+ long msgCount = NumberUtils.toLong(
+ event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
if (isSource) {
@@ -137,16 +128,16 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
DataProxyMetricItem metricItem = findMetricItem(dimensions);
if (isReadOp) {
if (isSuccess) {
- metricItem.readSuccessCount.incrementAndGet();
+ metricItem.readSuccessCount.addAndGet(msgCount);
metricItem.readSuccessSize.addAndGet(size);
} else {
- metricItem.readFailCount.incrementAndGet();
+ metricItem.readFailCount.addAndGet(msgCount);
metricItem.readFailSize.addAndGet(size);
}
} else {
if (isSuccess) {
- metricItem.sendSuccessCount.incrementAndGet();
- metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+ metricItem.sendSuccessCount.addAndGet(msgCount);
+ metricItem.sendSuccessSize.addAndGet(size);
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
long msgDataTimeL = Long.parseLong(
@@ -158,11 +149,11 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
}
} else {
- metricItem.sendFailCount.incrementAndGet();
- metricItem.sendFailSize.addAndGet(event.getBody().length);
+ metricItem.sendFailCount.addAndGet(msgCount);
+ metricItem.sendFailSize.addAndGet(size);
}
- metricItem.sendCount.incrementAndGet();
- metricItem.sendSize.addAndGet(event.getBody().length);
+ metricItem.sendCount.addAndGet(msgCount);
+ metricItem.sendSize.addAndGet(size);
}
}