You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:47:39 UTC

[inlong] branch release-1.3.0 updated: [INLONG-6005][DataProxy] Fix resend read values (#6006)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 9c48aa115 [INLONG-6005][DataProxy] Fix resend read values (#6006)
9c48aa115 is described below

commit 9c48aa115bc338e877c66bcba7f04cfde78173a8
Author: Lucas <10...@users.noreply.github.com>
AuthorDate: Fri Sep 23 18:43:51 2022 +0800

    [INLONG-6005][DataProxy] Fix resend read values (#6006)
---
 .../org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java | 2 ++
 .../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java  | 6 ------
 .../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java    | 4 ----
 3 files changed, 2 insertions(+), 10 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index e420e03a2..cb52823bb 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -161,6 +161,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
                 metricItem.sendFailCount.incrementAndGet();
                 metricItem.sendFailSize.addAndGet(event.getBody().length);
             }
+            metricItem.sendCount.incrementAndGet();
+            metricItem.sendSize.addAndGet(event.getBody().length);
         }
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 5c21a52e4..f0f1b6699 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -402,14 +402,8 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                             + "--> pulsar,Check if pulsar server or network is ok.(if this situation "
                             + "last long time it will cause memoryChannel full and fileChannel write.)", getName());
                     tx.rollback();
-                    // metric
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, false, event.getBody().length);
                 } else {
                     tx.commit();
-                    // metric
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, true, event.getBody().length);
                 }
             } else {
                 status = Status.BACKOFF;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 2544792ed..400eaa092 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -273,12 +273,8 @@ public class TubeSink extends AbstractSink implements Configurable {
                 if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
                     tx.commit();
                     cachedMsgCnt.incrementAndGet();
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, true, event.getBody().length);
                 } else {
                     tx.rollback();
-                    this.metricItemSet.fillSinkReadMetricItemsByEvent(
-                            event, false, event.getBody().length);
                 }
             } else {
                 status = Status.BACKOFF;