You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/26 11:47:39 UTC
[inlong] branch release-1.3.0 updated: [INLONG-6005][DataProxy] Fix resend read values (#6006)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 9c48aa115 [INLONG-6005][DataProxy] Fix resend read values (#6006)
9c48aa115 is described below
commit 9c48aa115bc338e877c66bcba7f04cfde78173a8
Author: Lucas <10...@users.noreply.github.com>
AuthorDate: Fri Sep 23 18:43:51 2022 +0800
[INLONG-6005][DataProxy] Fix resend read values (#6006)
---
.../org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java | 2 ++
.../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 6 ------
.../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java | 4 ----
3 files changed, 2 insertions(+), 10 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index e420e03a2..cb52823bb 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -161,6 +161,8 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
metricItem.sendFailCount.incrementAndGet();
metricItem.sendFailSize.addAndGet(event.getBody().length);
}
+ metricItem.sendCount.incrementAndGet();
+ metricItem.sendSize.addAndGet(event.getBody().length);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 5c21a52e4..f0f1b6699 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -402,14 +402,8 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
+ "--> pulsar,Check if pulsar server or network is ok.(if this situation "
+ "last long time it will cause memoryChannel full and fileChannel write.)", getName());
tx.rollback();
- // metric
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, false, event.getBody().length);
} else {
tx.commit();
- // metric
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, true, event.getBody().length);
}
} else {
status = Status.BACKOFF;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 2544792ed..400eaa092 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -273,12 +273,8 @@ public class TubeSink extends AbstractSink implements Configurable {
if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
tx.commit();
cachedMsgCnt.incrementAndGet();
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, true, event.getBody().length);
} else {
tx.rollback();
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, false, event.getBody().length);
}
} else {
status = Status.BACKOFF;