You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 05:03:19 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 7abb87d13 [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
7abb87d13 is described below

commit 7abb87d13902345e20d8f2d3e1667961e506616a
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Sep 8 13:02:23 2022 +0800

    [INLONG-5822][Sort] Fix the error of metric is empty for Elasticsearch (#5823)
---
 .../apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java    | 7 -------
 .../sort/elasticsearch/table/RowElasticsearchSinkFunction.java     | 5 +++--
 2 files changed, 3 insertions(+), 9 deletions(-)

diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 399b629cb..da5b5f9ef 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -273,10 +273,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
             sinkMetricData.registerMetricsForDirtyBytes();
             sinkMetricData.registerMetricsForDirtyRecords();
-            sinkMetricData.registerMetricsForNumBytesOut();
-            sinkMetricData.registerMetricsForNumRecordsOut();
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
         callBridge.verifyClientConnection(client);
         bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
@@ -512,9 +508,6 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
                                 }
                             }
                         }
-                        if (sinkMetricData.getNumRecordsOut() != null) {
-                            sinkMetricData.getNumRecordsOut().inc();
-                        }
                     }
                 } catch (Throwable t) {
                     // fail the sink and skip the rest of the items
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 52ab78fe2..80abeef54 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -100,8 +100,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
             streamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
             sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
-            sinkMetricData.registerMetricsForDirtyBytes();
-            sinkMetricData.registerMetricsForDirtyRecords();
             sinkMetricData.registerMetricsForNumBytesOut();
             sinkMetricData.registerMetricsForNumRecordsOut();
             sinkMetricData.registerMetricsForNumBytesOutPerSecond();
@@ -130,6 +128,9 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
         if (sinkMetricData.getNumBytesOut() != null) {
             sinkMetricData.getNumBytesOut().inc(document.length);
         }
+        if (sinkMetricData.getNumRecordsOut() != null) {
+            sinkMetricData.getNumRecordsOut().inc();
+        }
         outputMetricForAudit(document.length);
     }