You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/23 03:52:25 UTC
[inlong] branch master updated: [INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fd09a6d16 [INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994)
fd09a6d16 is described below
commit fd09a6d16eec068cf0b1d4b3b1708459badd2d43
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Fri Sep 23 11:52:20 2022 +0800
[INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994)
---
.../inlong/sort/base/metric/SinkMetricData.java | 54 ++++++++++++++++------
.../sort/elasticsearch/ElasticsearchSinkBase.java | 1 +
.../elasticsearch/ElasticsearchSinkFunction.java | 4 ++
.../table/RowElasticsearchSinkFunction.java | 5 ++
.../jdbc/internal/GenericJdbcSinkFunction.java | 3 +-
5 files changed, 52 insertions(+), 15 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 34f759a83..45e33c3c2 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -53,14 +54,16 @@ public class SinkMetricData implements MetricData {
private Counter dirtyBytes;
private Meter numRecordsOutPerSecond;
private Meter numBytesOutPerSecond;
+ private RegisteredMetric registeredMetric;
public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
this.labels = option.getLabels();
+ this.registeredMetric = option.getRegisteredMetric();
ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter();
ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter();
- switch (option.getRegisteredMetric()) {
+ switch (registeredMetric) {
case DIRTY:
registerMetricsForDirtyBytes(new ThreadSafeCounter());
registerMetricsForDirtyRecords(new ThreadSafeCounter());
@@ -291,18 +294,41 @@ public class SinkMetricData implements MetricData {
@Override
public String toString() {
- return "SinkMetricData{"
- + "metricGroup=" + metricGroup
- + ", labels=" + labels
- + ", auditImp=" + auditImp
- + ", numRecordsOut=" + numRecordsOut.getCount()
- + ", numBytesOut=" + numBytesOut.getCount()
- + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
- + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
- + ", dirtyRecords=" + dirtyRecords.getCount()
- + ", dirtyBytes=" + dirtyBytes.getCount()
- + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
- + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
- + '}';
+ switch (registeredMetric) {
+ case DIRTY:
+ return "SinkMetricData{"
+ + "metricGroup=" + metricGroup
+ + ", labels=" + labels
+ + ", auditImp=" + auditImp
+ + ", dirtyRecords=" + dirtyRecords.getCount()
+ + ", dirtyBytes=" + dirtyBytes.getCount()
+ + '}';
+ case NORMAL:
+ return "SinkMetricData{"
+ + "metricGroup=" + metricGroup
+ + ", labels=" + labels
+ + ", auditImp=" + auditImp
+ + ", numRecordsOut=" + numRecordsOut.getCount()
+ + ", numBytesOut=" + numBytesOut.getCount()
+ + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
+ + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+ + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+ + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+ + '}';
+ default:
+ return "SinkMetricData{"
+ + "metricGroup=" + metricGroup
+ + ", labels=" + labels
+ + ", auditImp=" + auditImp
+ + ", numRecordsOut=" + numRecordsOut.getCount()
+ + ", numBytesOut=" + numBytesOut.getCount()
+ + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
+ + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+ + ", dirtyRecords=" + dirtyRecords.getCount()
+ + ", dirtyBytes=" + dirtyBytes.getCount()
+ + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+ + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+ + '}';
+ }
}
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index dff8b9335..7761e9ebd 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// no initialization needed
+ elasticsearchSinkFunction.setRuntimeContext(getRuntimeContext());
elasticsearchSinkFunction.initializeState(context);
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
index e1c019918..d68c0d6e9 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -69,4 +69,8 @@ public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
default void snapshotState(FunctionSnapshotContext context) throws Exception {
}
+
+ default void setRuntimeContext(RuntimeContext ctx) {
+
+ }
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index a5abc4690..3e68f3e43 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -117,6 +117,11 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
}
}
+ @Override
+ public void setRuntimeContext(RuntimeContext ctx) {
+ this.runtimeContext = ctx;
+ }
+
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
if (this.inlongMetric != null) {
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
index 0afd5fb24..744fb8dc9 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -60,7 +60,8 @@ public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- outputFormat.initializeState(context);
+ outputFormat.setRuntimeContext(getRuntimeContext());
+ outputFormat.initializeState(context);
}
@Override