You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 05:02:29 UTC
[inlong] branch master updated: [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 32f24517f [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
32f24517f is described below
commit 32f24517f969b237baeb8f1388a46a5285fef425
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Sep 8 13:02:23 2022 +0800
[INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
---
.../apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java | 7 -------
.../sort/elasticsearch/table/RowElasticsearchSinkFunction.java | 5 +++--
2 files changed, 3 insertions(+), 9 deletions(-)
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index bb963b0a1..000e1c23a 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -274,10 +274,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
sinkMetricData.registerMetricsForDirtyBytes();
sinkMetricData.registerMetricsForDirtyRecords();
- sinkMetricData.registerMetricsForNumBytesOut();
- sinkMetricData.registerMetricsForNumRecordsOut();
- sinkMetricData.registerMetricsForNumBytesOutPerSecond();
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
callBridge.verifyClientConnection(client);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
@@ -513,9 +509,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
}
}
}
- if (sinkMetricData.getNumRecordsOut() != null) {
- sinkMetricData.getNumRecordsOut().inc();
- }
}
} catch (Throwable t) {
// fail the sink and skip the rest of the items
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index a5a50b20f..0ae93231d 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -100,8 +100,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
- sinkMetricData.registerMetricsForDirtyBytes();
- sinkMetricData.registerMetricsForDirtyRecords();
sinkMetricData.registerMetricsForNumBytesOut();
sinkMetricData.registerMetricsForNumRecordsOut();
sinkMetricData.registerMetricsForNumBytesOutPerSecond();
@@ -130,6 +128,9 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
if (sinkMetricData.getNumBytesOut() != null) {
sinkMetricData.getNumBytesOut().inc(document.length);
}
+ if (sinkMetricData.getNumRecordsOut() != null) {
+ sinkMetricData.getNumRecordsOut().inc();
+ }
outputMetricForAudit(document.length);
}