You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by yu...@apache.org on 2022/11/09 02:35:58 UTC

[inlong] branch master updated: [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474)

This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 924df34b9 [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474)
924df34b9 is described below

commit 924df34b9362c276a5183c7ef18c69da125e38ef
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Wed Nov 9 10:35:53 2022 +0800

    [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474)
---
 .../org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java   | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index 80d6812f2..5ade85306 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -80,6 +80,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -438,6 +440,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withInlongAudit(inlongAudit)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {