You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/10 14:29:39 UTC

[inlong] branch master updated: [INLONG-5470][Sort] Fix compile error (#5471)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 78ae5ff38 [INLONG-5470][Sort] Fix compile error (#5471)
78ae5ff38 is described below

commit 78ae5ff38996c2790e2184dc840ad1a7416d26c5
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Wed Aug 10 22:29:34 2022 +0800

    [INLONG-5470][Sort] Fix compile error (#5471)
---
 .../inlong/sort/elasticsearch/ElasticsearchSinkBase.java | 16 +++++++---------
 .../table/RowElasticsearchSinkFunction.java              | 15 +++++++--------
 2 files changed, 14 insertions(+), 17 deletions(-)

diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 4dd35d060..399b629cb 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -265,20 +265,18 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
     @Override
     public void open(Configuration parameters) throws Exception {
         client = callBridge.createClient(userConfig);
-        sinkMetricData = new SinkMetricData(getRuntimeContext().getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {
             String[] inLongMetricArray = inLongMetric.split("&");
             String groupId = inLongMetricArray[0];
             String streamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
-            sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
-            sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
-            sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId,
-                    "numBytesOutPerSecond");
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
-                    "numRecordsOutPerSecond");
+            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
+            sinkMetricData.registerMetricsForDirtyBytes();
+            sinkMetricData.registerMetricsForDirtyRecords();
+            sinkMetricData.registerMetricsForNumBytesOut();
+            sinkMetricData.registerMetricsForNumRecordsOut();
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
         callBridge.verifyClientConnection(client);
         bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 8af55faaf..d300158d9 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -82,19 +82,18 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
     public void open(RuntimeContext ctx) {
         indexGenerator.open();
         this.runtimeContext = ctx;
-        sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
         if (inLongMetric != null && !inLongMetric.isEmpty()) {
             String[] inLongMetricArray = inLongMetric.split("&");
             String groupId = inLongMetricArray[0];
             String streamId = inLongMetricArray[1];
             String nodeId = inLongMetricArray[2];
-            sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
-            sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
-            sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
-            sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
-            sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
-            sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
-                    "numRecordsOutPerSecond");
+            sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
+            sinkMetricData.registerMetricsForDirtyBytes();
+            sinkMetricData.registerMetricsForDirtyRecords();
+            sinkMetricData.registerMetricsForNumBytesOut();
+            sinkMetricData.registerMetricsForNumRecordsOut();
+            sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+            sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
         }
     }