You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/10 14:29:39 UTC
[inlong] branch master updated: [INLONG-5470][Sort] Fix compile error (#5471)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 78ae5ff38 [INLONG-5470][Sort] Fix compile error (#5471)
78ae5ff38 is described below
commit 78ae5ff38996c2790e2184dc840ad1a7416d26c5
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Wed Aug 10 22:29:34 2022 +0800
[INLONG-5470][Sort] Fix compile error (#5471)
---
.../inlong/sort/elasticsearch/ElasticsearchSinkBase.java | 16 +++++++---------
.../table/RowElasticsearchSinkFunction.java | 15 +++++++--------
2 files changed, 14 insertions(+), 17 deletions(-)
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 4dd35d060..399b629cb 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -265,20 +265,18 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
@Override
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
- sinkMetricData = new SinkMetricData(getRuntimeContext().getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split("&");
String groupId = inLongMetricArray[0];
String streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
- sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
- sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
- sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
- sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId,
- "numBytesOutPerSecond");
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
- "numRecordsOutPerSecond");
+ sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
+ sinkMetricData.registerMetricsForDirtyBytes();
+ sinkMetricData.registerMetricsForDirtyRecords();
+ sinkMetricData.registerMetricsForNumBytesOut();
+ sinkMetricData.registerMetricsForNumRecordsOut();
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
callBridge.verifyClientConnection(client);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 8af55faaf..d300158d9 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -82,19 +82,18 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
public void open(RuntimeContext ctx) {
indexGenerator.open();
this.runtimeContext = ctx;
- sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split("&");
String groupId = inLongMetricArray[0];
String streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
- sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
- sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
- sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
- sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
- sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
- "numRecordsOutPerSecond");
+ sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
+ sinkMetricData.registerMetricsForDirtyBytes();
+ sinkMetricData.registerMetricsForDirtyRecords();
+ sinkMetricData.registerMetricsForNumBytesOut();
+ sinkMetricData.registerMetricsForNumRecordsOut();
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond();
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond();
}
}