You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/23 03:52:25 UTC

[inlong] branch master updated: [INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fd09a6d16 [INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994)
fd09a6d16 is described below

commit fd09a6d16eec068cf0b1d4b3b1708459badd2d43
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Fri Sep 23 11:52:20 2022 +0800

    [INLONG-5992][Sort] Fix runtimeContext not initialized in JDBC and ES when reload metric status (#5994)
---
 .../inlong/sort/base/metric/SinkMetricData.java    | 54 ++++++++++++++++------
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  1 +
 .../elasticsearch/ElasticsearchSinkFunction.java   |  4 ++
 .../table/RowElasticsearchSinkFunction.java        |  5 ++
 .../jdbc/internal/GenericJdbcSinkFunction.java     |  3 +-
 5 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 34f759a83..45e33c3c2 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -53,14 +54,16 @@ public class SinkMetricData implements MetricData {
     private Counter dirtyBytes;
     private Meter numRecordsOutPerSecond;
     private Meter numBytesOutPerSecond;
+    private RegisteredMetric registeredMetric;
 
     public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
         this.labels = option.getLabels();
+        this.registeredMetric = option.getRegisteredMetric();
 
         ThreadSafeCounter recordsOutCounter = new ThreadSafeCounter();
         ThreadSafeCounter bytesOutCounter = new ThreadSafeCounter();
-        switch (option.getRegisteredMetric()) {
+        switch (registeredMetric) {
             case DIRTY:
                 registerMetricsForDirtyBytes(new ThreadSafeCounter());
                 registerMetricsForDirtyRecords(new ThreadSafeCounter());
@@ -291,18 +294,41 @@ public class SinkMetricData implements MetricData {
 
     @Override
     public String toString() {
-        return "SinkMetricData{"
-                + "metricGroup=" + metricGroup
-                + ", labels=" + labels
-                + ", auditImp=" + auditImp
-                + ", numRecordsOut=" + numRecordsOut.getCount()
-                + ", numBytesOut=" + numBytesOut.getCount()
-                + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
-                + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
-                + ", dirtyRecords=" + dirtyRecords.getCount()
-                + ", dirtyBytes=" + dirtyBytes.getCount()
-                + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
-                + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
-                + '}';
+        switch (registeredMetric) {
+            case DIRTY:
+                 return "SinkMetricData{"
+                        + "metricGroup=" + metricGroup
+                        + ", labels=" + labels
+                        + ", auditImp=" + auditImp
+                        + ", dirtyRecords=" + dirtyRecords.getCount()
+                        + ", dirtyBytes=" + dirtyBytes.getCount()
+                        + '}';
+            case NORMAL:
+                return "SinkMetricData{"
+                        + "metricGroup=" + metricGroup
+                        + ", labels=" + labels
+                        + ", auditImp=" + auditImp
+                        + ", numRecordsOut=" + numRecordsOut.getCount()
+                        + ", numBytesOut=" + numBytesOut.getCount()
+                        + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
+                        + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+                        + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+                        + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+                        + '}';
+            default:
+                return "SinkMetricData{"
+                        + "metricGroup=" + metricGroup
+                        + ", labels=" + labels
+                        + ", auditImp=" + auditImp
+                        + ", numRecordsOut=" + numRecordsOut.getCount()
+                        + ", numBytesOut=" + numBytesOut.getCount()
+                        + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
+                        + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+                        + ", dirtyRecords=" + dirtyRecords.getCount()
+                        + ", dirtyBytes=" + dirtyBytes.getCount()
+                        + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+                        + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+                        + '}';
+        }
     }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index dff8b9335..7761e9ebd 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
         // no initialization needed
+        elasticsearchSinkFunction.setRuntimeContext(getRuntimeContext());
         elasticsearchSinkFunction.initializeState(context);
     }
 
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
index e1c019918..d68c0d6e9 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -69,4 +69,8 @@ public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
     default void snapshotState(FunctionSnapshotContext context) throws Exception {
 
     }
+
+    default  void setRuntimeContext(RuntimeContext ctx) {
+
+    }
 }
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index a5abc4690..3e68f3e43 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -117,6 +117,11 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
         }
     }
 
+    @Override
+    public void setRuntimeContext(RuntimeContext ctx) {
+        this.runtimeContext = ctx;
+    }
+
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
         if (this.inlongMetric != null) {
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
index 0afd5fb24..744fb8dc9 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -60,7 +60,8 @@ public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
-            outputFormat.initializeState(context);
+        outputFormat.setRuntimeContext(getRuntimeContext());
+        outputFormat.initializeState(context);
     }
 
     @Override