You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/25 03:22:36 UTC

[inlong] branch master updated: [INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c041a5ab6 [INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013)
c041a5ab6 is described below

commit c041a5ab6baea8993909f3c30d93a3a6fc62929e
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Sep 25 11:22:30 2022 +0800

    [INLONG-6012][DataProxy] Adjust the fields statistics unit in DataProxyMetricItemSet (#6013)
---
 .../dataproxy/metrics/DataProxyMetricItemSet.java  | 29 ++++++++--------------
 1 file changed, 10 insertions(+), 19 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index cb52823bb..1256c328a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -80,17 +80,6 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
         fillMetricItemsByEvent(event, true, true, isSuccess, size, 0);
     }
 
-    /**
-     * Fill sink metric items by event
-     *
-     * @param event    the event object
-     * @param isSuccess  whether success read or send
-     * @param size       the message size
-     */
-    public void fillSinkReadMetricItemsByEvent(Event event, boolean isSuccess, long size) {
-        fillMetricItemsByEvent(event, false, true, isSuccess, size, 0);
-    }
-
     /**
      * Fill sink send metric items by event
      *
@@ -124,6 +113,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
                 event.getHeaders().get(AttributeConstants.STREAM_ID));
         long dataTime = NumberUtils.toLong(
                 event.getHeaders().get(AttributeConstants.DATA_TIME));
+        long msgCount = NumberUtils.toLong(
+                event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
         long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         if (isSource) {
@@ -137,16 +128,16 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
         DataProxyMetricItem metricItem = findMetricItem(dimensions);
         if (isReadOp) {
             if (isSuccess) {
-                metricItem.readSuccessCount.incrementAndGet();
+                metricItem.readSuccessCount.addAndGet(msgCount);
                 metricItem.readSuccessSize.addAndGet(size);
             } else {
-                metricItem.readFailCount.incrementAndGet();
+                metricItem.readFailCount.addAndGet(msgCount);
                 metricItem.readFailSize.addAndGet(size);
             }
         } else {
             if (isSuccess) {
-                metricItem.sendSuccessCount.incrementAndGet();
-                metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+                metricItem.sendSuccessCount.addAndGet(msgCount);
+                metricItem.sendSuccessSize.addAndGet(size);
                 if (sendTime > 0) {
                     long currentTime = System.currentTimeMillis();
                     long msgDataTimeL = Long.parseLong(
@@ -158,11 +149,11 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
                     metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
                 }
             } else {
-                metricItem.sendFailCount.incrementAndGet();
-                metricItem.sendFailSize.addAndGet(event.getBody().length);
+                metricItem.sendFailCount.addAndGet(msgCount);
+                metricItem.sendFailSize.addAndGet(size);
             }
-            metricItem.sendCount.incrementAndGet();
-            metricItem.sendSize.addAndGet(event.getBody().length);
+            metricItem.sendCount.addAndGet(msgCount);
+            metricItem.sendSize.addAndGet(size);
         }
     }