You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2023/01/16 16:12:39 UTC

[inlong] branch master updated: [INLONG-7250][Sort] Output the read phase metrics for MySQL reader (#7251)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9606de07c [INLONG-7250][Sort] Output the read phase metrics for MySQL reader (#7251)
9606de07c is described below

commit 9606de07cb47195f459c4b5ed09cef40374c1ba2
Author: chestnufang <65...@users.noreply.github.com>
AuthorDate: Tue Jan 17 00:12:30 2023 +0800

    [INLONG-7250][Sort] Output the read phase metrics for MySQL reader (#7251)
---
 .../cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java     | 10 ++++++++++
 .../sort/cdc/mysql/source/reader/MySqlRecordEmitter.java       |  5 +++++
 2 files changed, 15 insertions(+)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 2655ab7ac..a46076ef2 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
@@ -137,4 +138,13 @@ public class MySqlSourceReaderMetrics {
     public SourceTableMetricData getSourceMetricData() {
         return sourceTableMetricData;
     }
+
+    /**
+     * output read phase metric
+     *
+     * @param readPhase the readPhase of record
+     */
+    public void outputReadPhaseMetrics(ReadPhase readPhase) {
+        sourceTableMetricData.outputReadPhaseMetrics(readPhase);
+    }
 }
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index eb961df00..e9427ec3c 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -28,6 +28,7 @@ import io.debezium.relational.history.TableChanges.TableChange;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
 import org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics;
@@ -171,6 +172,10 @@ public final class MySqlRecordEmitter<T>
 
     private void updateStartingOffsetForSplit(MySqlSplitState splitState, SourceRecord element) {
         if (splitState.isBinlogSplitState()) {
+            // record the time metric to enter the incremental phase
+            if (sourceReaderMetrics != null) {
+                sourceReaderMetrics.outputReadPhaseMetrics(ReadPhase.INCREASE_PHASE);
+            }
             BinlogOffset position = getBinlogPosition(element);
             splitState.asBinlogSplitState().setStartingOffset(position);
         }