You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 05:03:19 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 7abb87d13 [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
7abb87d13 is described below
commit 7abb87d13902345e20d8f2d3e1667961e506616a
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Sep 8 13:02:23 2022 +0800
[INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
---
.../apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java | 7 -------
.../sort/elasticsearch/table/RowElasticsearchSinkFunction.java | 5 +++--
2 files changed, 3 insertions(+), 9 deletions(-)
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 399b629cb..da5b5f9ef 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -273,10 +273,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
sinkMetricData.registerMetricsForDirtyBytes();
sinkMetricData.registerMetricsForDirtyRecords();
- sinkMetricData.registerMetricsForNumBytesOut();
- sinkMetricData.registerMetricsForNumRecordsOut();
- sinkMetricData.registerMetricsForNumBytesOutPerSecond();
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
callBridge.verifyClientConnection(client);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
@@ -512,9 +508,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
}
}
}
- if (sinkMetricData.getNumRecordsOut() != null) {
- sinkMetricData.getNumRecordsOut().inc();
- }
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 52ab78fe2..80abeef54 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -100,8 +100,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
- sinkMetricData.registerMetricsForDirtyBytes();
- sinkMetricData.registerMetricsForDirtyRecords();
sinkMetricData.registerMetricsForNumBytesOut();
sinkMetricData.registerMetricsForNumRecordsOut();
sinkMetricData.registerMetricsForNumBytesOutPerSecond();
@@ -130,6 +128,9 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
if (sinkMetricData.getNumBytesOut() != null) {
sinkMetricData.getNumBytesOut().inc(document.length);
}
+ if (sinkMetricData.getNumRecordsOut() != null) {
+ sinkMetricData.getNumRecordsOut().inc();
+ }
outputMetricForAudit(document.length);
}