You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2023/01/16 16:12:39 UTC
[inlong] branch master updated: [INLONG-7250][Sort] Output the read phase metrics for MySQL reader (#7251)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9606de07c [INLONG-7250][Sort] Output the read phase metrics for MySQL reader (#7251)
9606de07c is described below
commit 9606de07cb47195f459c4b5ed09cef40374c1ba2
Author: chestnufang <65...@users.noreply.github.com>
AuthorDate: Tue Jan 17 00:12:30 2023 +0800
[INLONG-7250][Sort] Output the read phase metrics for MySQL reader (#7251)
---
.../cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java | 10 ++++++++++
.../sort/cdc/mysql/source/reader/MySqlRecordEmitter.java | 5 +++++
2 files changed, 15 insertions(+)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 2655ab7ac..a46076ef2 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.enums.ReadPhase;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
@@ -137,4 +138,13 @@ public class MySqlSourceReaderMetrics {
public SourceTableMetricData getSourceMetricData() {
return sourceTableMetricData;
}
+
+ /**
+ * output read phase metric
+ *
+ * @param readPhase the readPhase of record
+ */
+ public void outputReadPhaseMetrics(ReadPhase readPhase) {
+ sourceTableMetricData.outputReadPhaseMetrics(readPhase);
+ }
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index eb961df00..e9427ec3c 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -28,6 +28,7 @@ import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.base.enums.ReadPhase;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics;
@@ -171,6 +172,10 @@ public final class MySqlRecordEmitter<T>
private void updateStartingOffsetForSplit(MySqlSplitState splitState, SourceRecord element) {
if (splitState.isBinlogSplitState()) {
+ // record the time metric to enter the incremental phase
+ if (sourceReaderMetrics != null) {
+ sourceReaderMetrics.outputReadPhaseMetrics(ReadPhase.INCREASE_PHASE);
+ }
BinlogOffset position = getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}