You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 05:02:29 UTC

[inlong] branch master updated: [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f24517f [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
32f24517f is described below

commit 32f24517f969b237baeb8f1388a46a5285fef425
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Sep 8 13:02:23 2022 +0800

    [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
---
 .../apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java    | 7 -------
 .../sort/elasticsearch/table/RowElasticsearchSinkFunction.java     | 5 +++--
 2 files changed, 3 insertions(+), 9 deletions(-)

diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index bb963b0a1..000e1c23a 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -274,10 +274,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
             sinkMetricData.registerMetricsForDirtyBytes();
             sinkMetricData.registerMetricsForDirtyRecords();
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
         callBridge.verifyClientConnection(client);
         bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
@@ -513,9 +509,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
                                 }
                             }
                         }
-                        if (sinkMetricData.getNumRecordsOut() != null) {
-                            sinkMetricData.getNumRecordsOut().inc();
-                        }
                     }
                 } catch (Throwable t) {
                     // fail the sink and skip the rest of the items
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index a5a50b20f..0ae93231d 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -100,8 +100,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
             streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForDirtyBytes();
-            sinkMetricData.registerMetricsForDirtyRecords();
             sinkMetricData.registerMetricsForNumBytesOut();
             sinkMetricData.registerMetricsForNumRecordsOut();
             sinkMetricData.registerMetricsForNumBytesOutPerSecond();
@@ -130,6 +128,9 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
         if (sinkMetricData.getNumBytesOut() != null) {
             sinkMetricData.getNumBytesOut().inc(document.length);
         }
+        if (sinkMetricData.getNumRecordsOut() != null) {
+            sinkMetricData.getNumRecordsOut().inc();
+        }
         outputMetricForAudit(document.length);
     }