You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/26 11:44:12 UTC
[incubator-inlong] branch master updated: [INLONG-3953][Sort] Add MySQL dynamic table implementation - modified from Flink CDC (#3955)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 119e14583 [INLONG-3953][Sort] Add MySQL dynamic table implementation - modified from Flink CDC (#3955)
119e14583 is described below
commit 119e145837d8b1c5af5487a112a62f5fc40c90d3
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Apr 26 19:44:07 2022 +0800
[INLONG-3953][Sort] Add MySQL dynamic table implementation - modified from Flink CDC (#3955)
* [INLONG-3953][Sort] add mysql table schema, modified from flinkCDC
* [INLONG-3953][Sort] fix checkstyle
* [INLONG-3953][Sort] add hikariCP dependency
---
inlong-sort/sort-single-tenant/pom.xml | 4 +
.../debezium/DebeziumDeserializationSchema.java | 12 +-
.../flink/cdc/debezium/DebeziumSourceFunction.java | 33 +-
.../JsonDebeziumDeserializationSchema.java | 98 ------
.../StringDebeziumDeserializationSchema.java | 49 ---
.../history/FlinkJsonTableChangeSerializer.java | 207 ------------
.../debezium/internal/DebeziumChangeConsumer.java | 103 ------
.../debezium/internal/DebeziumChangeFetcher.java | 309 -----------------
.../cdc/debezium/internal/DebeziumOffset.java | 64 ----
.../internal/DebeziumOffsetSerializer.java | 41 ---
.../debezium/internal/FlinkDatabaseHistory.java | 116 -------
.../internal/FlinkDatabaseSchemaHistory.java | 199 -----------
.../debezium/internal/FlinkOffsetBackingStore.java | 201 -----------
.../flink/cdc/debezium/internal/Handover.java | 194 -----------
.../flink/cdc/debezium/internal/SchemaRecord.java | 95 ------
.../cdc/debezium/utils/DatabaseHistoryUtil.java | 2 +-
.../flink/cdc/debezium/utils/ObjectUtils.java | 104 ++++++
.../flink/cdc/mysql/DebeziumUtils.java | 197 +++++++++++
.../singletenant/flink/cdc/mysql/MySqlSource.java | 240 +++++++++++++
.../flink/cdc/mysql/MySqlValidator.java | 161 +++++++++
.../cdc/mysql/SeekBinlogToTimestampFilter.java | 93 ++++++
.../flink/cdc/mysql/config/MySqlSourceConfig.java | 207 ++++++++++++
.../flink/cdc/mysql/config/MySqlSourceOptions.java | 237 +++++++++++++
.../flink/cdc/mysql/config/ServerIdRange.java | 112 +++++++
.../cdc/mysql/connection/ConnectionPoolId.java | 57 ++++
.../cdc/mysql/connection/ConnectionPools.java | 35 ++
.../mysql/connection/JdbcConnectionFactory.java | 76 +++++
.../cdc/mysql/connection/JdbcConnectionPools.java | 56 ++++
.../mysql/connection/PooledDataSourceFactory.java | 89 +++++
.../flink/cdc/mysql/table/JdbcUrlUtils.java | 52 +++
.../MySqlDeserializationConverterFactory.java | 153 +++++++++
.../cdc/mysql/table/MySqlReadableMetadata.java | 355 ++++++++++++++++++++
.../flink/cdc/mysql/table/MySqlTableSource.java | 370 +++++++++++++++++++++
.../cdc/mysql/table/MySqlTableSourceFactory.java | 308 +++++++++++++++++
.../flink/cdc/mysql/table/StartupMode.java | 36 ++
.../flink/cdc/mysql/table/StartupOptions.java | 129 +++++++
36 files changed, 3092 insertions(+), 1702 deletions(-)
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 29eb8ead2..a4e5ad628 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -210,6 +210,10 @@
<artifactId>debezium-core</artifactId>
<version>${debezium-core.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ </dependency>
</dependencies>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
index a23fe10e3..c47d2220a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
import io.debezium.relational.history.TableChanges.TableChange;
import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
@@ -32,11 +33,14 @@ import org.apache.kafka.connect.source.SourceRecord;
* @param <T> The type created by the deserialization schema.
*/
@PublicEvolving
-public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>,
+ com.ververica.cdc.debezium.DebeziumDeserializationSchema<T> {
- /** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */
- void deserialize(SourceRecord record, Collector<T> out) throws Exception;
-
void deserialize(SourceRecord record, Collector<T> out, TableChange tableChange) throws Exception;
+ @Override
+ void deserialize(SourceRecord sourceRecord, Collector<T> collector) throws Exception;
+
+ @Override
+ TypeInformation<T> getProducedType();
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
index 321b9ac14..60ef2bc4f 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
@@ -21,15 +21,15 @@ package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeConsumer;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeFetcher;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffset;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffsetSerializer;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseHistory;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkOffsetBackingStore;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.Handover;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
@@ -165,12 +165,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Accessor for state in the operator state backend. */
private transient ListState<byte[]> offsetState;
- /**
- * State to store the history records, i.e. schema changes.
- *
- * @see FlinkDatabaseHistory
- * @see FlinkDatabaseSchemaHistory
- */
private transient ListState<String> schemaRecordsState;
// ---------------------------------------------------------------------------------------
@@ -179,10 +173,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private transient ExecutorService executor;
private transient DebeziumEngine<?> engine;
- /**
- * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
- * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
- */
+
private transient String engineInstanceName;
/** Consume the events from the engine and commit the offset to the engine. */
@@ -393,9 +384,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
this.debeziumChangeFetcher =
- new DebeziumChangeFetcher<>(
+ new DebeziumChangeFetcher<T>(
sourceContext,
- deserializer,
+ deserializer,
restoredOffsetState == null, // DB snapshot phase if restore state is null
dbzHeartbeatPrefix,
handover);
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
deleted file mode 100644
index 1539a3570..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
-
-import io.debezium.relational.history.TableChanges.TableChange;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.storage.ConverterConfig;
-import org.apache.kafka.connect.storage.ConverterType;
-
-/**
- * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
- * received {@link SourceRecord} to JSON String.
- */
-public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
-
- private static final long serialVersionUID = 1L;
-
- private transient JsonConverter jsonConverter;
-
- /**
- * Configuration whether to enable {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
- * schema in messages.
- */
- private final Boolean includeSchema;
-
- /** The custom configurations for {@link JsonConverter}. */
- private Map<String, Object> customConverterConfigs;
-
- public JsonDebeziumDeserializationSchema() {
- this(false);
- }
-
- public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
- this.includeSchema = includeSchema;
- }
-
- public JsonDebeziumDeserializationSchema(
- Boolean includeSchema, Map<String, Object> customConverterConfigs) {
- this.includeSchema = includeSchema;
- this.customConverterConfigs = customConverterConfigs;
- }
-
- @Override
- public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
- if (jsonConverter == null) {
- initializeJsonConverter();
- }
- byte[] bytes =
- jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
- out.collect(new String(bytes));
- }
-
- @Override
- public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
- throws Exception {
-
- }
-
- /** Initialize {@link JsonConverter} with given configs. */
- private void initializeJsonConverter() {
- jsonConverter = new JsonConverter();
- final HashMap<String, Object> configs = new HashMap<>(2);
- configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
- configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
- if (customConverterConfigs != null) {
- configs.putAll(customConverterConfigs);
- }
- jsonConverter.configure(configs);
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
deleted file mode 100644
index 1b7931abd..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
-
-import io.debezium.relational.history.TableChanges.TableChange;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.source.SourceRecord;
-
-/**
- * A simple implementation of {@link DebeziumDeserializationSchema} which converts the received
- * {@link SourceRecord} into String.
- */
-public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
- private static final long serialVersionUID = -3168848963265670603L;
-
- @Override
- public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
- out.collect(record.toString());
- }
-
- @Override
- public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
- throws Exception {
-
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
deleted file mode 100644
index 856d7fa83..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.history;
-
-import io.debezium.document.Array;
-import io.debezium.document.Array.Entry;
-import io.debezium.document.Document;
-import io.debezium.document.Value;
-import io.debezium.relational.Column;
-import io.debezium.relational.ColumnEditor;
-import io.debezium.relational.Table;
-import io.debezium.relational.TableEditor;
-import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges;
-import io.debezium.relational.history.TableChanges.TableChange;
-import io.debezium.relational.history.TableChanges.TableChangeType;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/**
- * The serializer responsible for converting of {@link TableChanges} into a JSON format. Copied from
- * io.debezium.relational.history.JsonTableChangeSerializer, but add serialization/deserialization
- * for column's enumValues
- */
-public class FlinkJsonTableChangeSerializer implements TableChanges.TableChangesSerializer<Array> {
-
- @Override
- public Array serialize(TableChanges tableChanges) {
- List<Value> values =
- StreamSupport.stream(tableChanges.spliterator(), false)
- .map(this::toDocument)
- .map(Value::create)
- .collect(Collectors.toList());
-
- return Array.create(values);
- }
-
- public Document toDocument(TableChange tableChange) {
- Document document = Document.create();
-
- document.setString("type", tableChange.getType().name());
- document.setString("id", tableChange.getId().toDoubleQuotedString());
- document.setDocument("table", toDocument(tableChange.getTable()));
- return document;
- }
-
- private Document toDocument(Table table) {
- Document document = Document.create();
-
- document.set("defaultCharsetName", table.defaultCharsetName());
- document.set("primaryKeyColumnNames", Array.create(table.primaryKeyColumnNames()));
-
- List<Document> columns =
- table.columns().stream().map(this::toDocument).collect(Collectors.toList());
-
- document.setArray("columns", Array.create(columns));
-
- return document;
- }
-
- private Document toDocument(Column column) {
- Document document = Document.create();
-
- document.setString("name", column.name());
- document.setNumber("jdbcType", column.jdbcType());
-
- if (column.nativeType() != Column.UNSET_INT_VALUE) {
- document.setNumber("nativeType", column.nativeType());
- }
-
- document.setString("typeName", column.typeName());
- document.setString("typeExpression", column.typeExpression());
- document.setString("charsetName", column.charsetName());
-
- if (column.length() != Column.UNSET_INT_VALUE) {
- document.setNumber("length", column.length());
- }
-
- column.scale().ifPresent(s -> document.setNumber("scale", s));
-
- document.setNumber("position", column.position());
- document.setBoolean("optional", column.isOptional());
- document.setBoolean("autoIncremented", column.isAutoIncremented());
- document.setBoolean("generated", column.isGenerated());
-
- // BEGIN FLINK MODIFICATION
- document.setArray("enumValues", column.enumValues().toArray());
- // END FLINK MODIFICATION
-
- return document;
- }
-
- @Override
- public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
- TableChanges tableChanges = new TableChanges();
-
- for (Entry entry : array) {
- TableChange change =
- fromDocument(entry.getValue().asDocument(), useCatalogBeforeSchema);
-
- if (change.getType() == TableChangeType.CREATE) {
- tableChanges.create(change.getTable());
- } else if (change.getType() == TableChangeType.ALTER) {
- tableChanges.alter(change.getTable());
- } else if (change.getType() == TableChangeType.DROP) {
- tableChanges.drop(change.getTable());
- }
- }
-
- return tableChanges;
- }
-
- private static Table fromDocument(TableId id, Document document) {
- TableEditor editor =
- Table.editor()
- .tableId(id)
- .setDefaultCharsetName(document.getString("defaultCharsetName"));
-
- document.getArray("columns")
- .streamValues()
- .map(Value::asDocument)
- .map(
- v -> {
- ColumnEditor columnEditor =
- Column.editor()
- .name(v.getString("name"))
- .jdbcType(v.getInteger("jdbcType"));
-
- Integer nativeType = v.getInteger("nativeType");
- if (nativeType != null) {
- columnEditor.nativeType(nativeType);
- }
-
- columnEditor
- .type(v.getString("typeName"), v.getString("typeExpression"))
- .charsetName(v.getString("charsetName"));
-
- Integer length = v.getInteger("length");
- if (length != null) {
- columnEditor.length(length);
- }
-
- Integer scale = v.getInteger("scale");
- if (scale != null) {
- columnEditor.scale(scale);
- }
-
- columnEditor
- .position(v.getInteger("position"))
- .optional(v.getBoolean("optional"))
- .autoIncremented(v.getBoolean("autoIncremented"))
- .generated(v.getBoolean("generated"));
-
- // BEGIN FLINK MODIFICATION
- Array enumValues = v.getArray("enumValues");
- if (enumValues != null && !enumValues.isEmpty()) {
- columnEditor.enumValues(
- enumValues
- .streamValues()
- .map(Value::asString)
- .collect(Collectors.toList()));
- }
- // END FLINK MODIFICATION
-
- return columnEditor.create();
- })
- .forEach(editor::addColumn);
-
- editor.setPrimaryKeyNames(
- document.getArray("primaryKeyColumnNames")
- .streamValues()
- .map(Value::asString)
- .collect(Collectors.toList()));
-
- return editor.create();
- }
-
- public static TableChange fromDocument(Document document, boolean useCatalogBeforeSchema) {
- TableChangeType type = TableChangeType.valueOf(document.getString("type"));
- TableId id = TableId.parse(document.getString("id"), useCatalogBeforeSchema);
- Table table = null;
-
- if (type == TableChangeType.CREATE || type == TableChangeType.ALTER) {
- table = fromDocument(id, document.getDocument("table"));
- } else {
- table = Table.editor().tableId(id).create();
- }
- return new TableChange(type, table);
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
deleted file mode 100644
index bb31eadec..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import io.debezium.embedded.EmbeddedEngineChangeEvent;
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.DebeziumEngine;
-import io.debezium.engine.DebeziumEngine.RecordCommitter;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.annotation.Internal;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Consume debezium change events. */
-@Internal
-public class DebeziumChangeConsumer
- implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
- public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
- public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
- private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
-
- private final Handover handover;
- // keep the modification is visible to the source function
- private volatile RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;
-
- public DebeziumChangeConsumer(Handover handover) {
- this.handover = handover;
- }
-
- @Override
- public void handleBatch(
- List<ChangeEvent<SourceRecord, SourceRecord>> events,
- RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
- try {
- currentCommitter = recordCommitter;
- handover.produce(events);
- } catch (Throwable e) {
- // Hold this exception in handover and trigger the fetcher to exit
- handover.reportError(e);
- }
- }
-
- @SuppressWarnings("unchecked")
- public void commitOffset(DebeziumOffset offset) throws InterruptedException {
- // Although the committer is read/write by multi-thread, the committer will be not changed
- // frequently.
- if (currentCommitter == null) {
- LOG.info(
- "commitOffset() called on Debezium change consumer which doesn't receive records yet.");
- return;
- }
-
- // only the offset is used
- SourceRecord recordWrapper =
- new SourceRecord(
- offset.sourcePartition,
- adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
- "DUMMY",
- Schema.BOOLEAN_SCHEMA,
- true);
- EmbeddedEngineChangeEvent<SourceRecord, SourceRecord> changeEvent =
- new EmbeddedEngineChangeEvent<>(null, recordWrapper, recordWrapper);
- currentCommitter.markProcessed(changeEvent);
- currentCommitter.markBatchFinished();
- }
-
- /**
- * We have to adjust type of LSN values to Long, because it might be Integer after
- * deserialization, however {@code
- * io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(java.util.Map)}
- * requires Long.
- */
- private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
- if (sourceOffset.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
- String value = sourceOffset.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString();
- sourceOffset.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
- }
- if (sourceOffset.containsKey(LAST_COMMIT_LSN_KEY)) {
- String value = sourceOffset.get(LAST_COMMIT_LSN_KEY).toString();
- sourceOffset.put(LAST_COMMIT_LSN_KEY, Long.parseLong(value));
- }
- return sourceOffset;
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
deleted file mode 100644
index ea44e7026..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
-import io.debezium.connector.SnapshotRecord;
-import io.debezium.data.Envelope;
-import io.debezium.engine.ChangeEvent;
-import io.debezium.engine.DebeziumEngine;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering
- * Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler
- * also needs different strategy. In snapshot phase, the handler needs to hold the lock until the
- * snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when
- * emitting the records.
- *
- * @param <T> The type of elements produced by the handler.
- */
-@Internal
-public class DebeziumChangeFetcher<T> {
-
- private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class);
-
- private final SourceFunction.SourceContext<T> sourceContext;
-
- /**
- * The lock that guarantees that record emission and state updates are atomic, from the view of
- * taking a checkpoint.
- */
- private final Object checkpointLock;
-
- /** The schema to convert from Debezium's messages into Flink's objects. */
- private final DebeziumDeserializationSchema<T> deserialization;
-
- /** A collector to emit records in batch (bundle). */
- private final DebeziumCollector debeziumCollector;
-
- private final DebeziumOffset debeziumOffset;
-
- private final DebeziumOffsetSerializer stateSerializer;
-
- private final String heartbeatTopicPrefix;
-
- private boolean isInDbSnapshotPhase;
-
- private final Handover handover;
-
- private volatile boolean isRunning = true;
-
- // ---------------------------------------------------------------------------------------
- // Metrics
- // ---------------------------------------------------------------------------------------
-
- /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
- private volatile long messageTimestamp = 0L;
-
- /** The last record processing time. */
- private volatile long processTime = 0L;
-
- /**
- * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
- * record fetched into the source operator.
- */
- private volatile long fetchDelay = 0L;
-
- /**
- * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
- * source operator.
- */
- private volatile long emitDelay = 0L;
-
- // ------------------------------------------------------------------------
-
- public DebeziumChangeFetcher(
- SourceFunction.SourceContext<T> sourceContext,
- DebeziumDeserializationSchema<T> deserialization,
- boolean isInDbSnapshotPhase,
- String heartbeatTopicPrefix,
- Handover handover) {
- this.sourceContext = sourceContext;
- this.checkpointLock = sourceContext.getCheckpointLock();
- this.deserialization = deserialization;
- this.isInDbSnapshotPhase = isInDbSnapshotPhase;
- this.heartbeatTopicPrefix = heartbeatTopicPrefix;
- this.debeziumCollector = new DebeziumCollector();
- this.debeziumOffset = new DebeziumOffset();
- this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
- this.handover = handover;
- }
-
- /**
- * Take a snapshot of the Debezium handler state.
- *
- * <p>Important: This method must be called under the checkpoint lock.
- */
- public byte[] snapshotCurrentState() throws Exception {
- // this method assumes that the checkpoint lock is held
- assert Thread.holdsLock(checkpointLock);
- if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
- return null;
- }
-
- return stateSerializer.serialize(debeziumOffset);
- }
-
- /**
- * Process change messages from the {@link Handover} and collect the processed messages by
- * {@link Collector}.
- */
- public void runFetchLoop() throws Exception {
- try {
- // begin snapshot database phase
- if (isInDbSnapshotPhase) {
- List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
-
- synchronized (checkpointLock) {
- LOG.info(
- "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
- handleBatch(events);
- while (isRunning && isInDbSnapshotPhase) {
- handleBatch(handover.pollNext());
- }
- }
- LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
- }
-
- // begin streaming binlog phase
- while (isRunning) {
- // If the handover is closed or has errors, exit.
- // If there is no streaming phase, the handover will be closed by the engine.
- handleBatch(handover.pollNext());
- }
- } catch (Handover.ClosedException e) {
- // ignore
- }
- }
-
- public void close() {
- isRunning = false;
- handover.close();
- }
-
- // ---------------------------------------------------------------------------------------
- // Metric getter
- // ---------------------------------------------------------------------------------------
-
- /**
- * The metric indicates delay from data generation to entry into the system.
- *
- * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
- * unavailable.
- */
- public long getFetchDelay() {
- return fetchDelay;
- }
-
- /**
- * The metric indicates delay from data generation to leaving the source operator.
- *
- * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
- * unavailable.
- */
- public long getEmitDelay() {
- return emitDelay;
- }
-
- public long getIdleTime() {
- return System.currentTimeMillis() - processTime;
- }
-
- // ---------------------------------------------------------------------------------------
- // Helper
- // ---------------------------------------------------------------------------------------
-
- private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
- throws Exception {
- if (CollectionUtils.isEmpty(changeEvents)) {
- return;
- }
- this.processTime = System.currentTimeMillis();
-
- for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
- SourceRecord record = event.value();
- updateMessageTimestamp(record);
- fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp;
-
- if (isHeartbeatEvent(record)) {
- // keep offset update
- synchronized (checkpointLock) {
- debeziumOffset.setSourcePartition(record.sourcePartition());
- debeziumOffset.setSourceOffset(record.sourceOffset());
- }
- // drop heartbeat events
- continue;
- }
-
- deserialization.deserialize(record, debeziumCollector);
-
- if (!isSnapshotRecord(record)) {
- LOG.debug("Snapshot phase finishes.");
- isInDbSnapshotPhase = false;
- }
-
- // emit the actual records. this also updates offset state atomically
- emitRecordsUnderCheckpointLock(
- debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
- }
- }
-
- private void emitRecordsUnderCheckpointLock(
- Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
- // Emit the records. Use the checkpoint lock to guarantee
- // atomicity of record emission and offset state update.
- // The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode.
- synchronized (checkpointLock) {
- T record;
- while ((record = records.poll()) != null) {
- emitDelay =
- isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp;
- sourceContext.collect(record);
- }
- // update offset to state
- debeziumOffset.setSourcePartition(sourcePartition);
- debeziumOffset.setSourceOffset(sourceOffset);
- }
- }
-
- private void updateMessageTimestamp(SourceRecord record) {
- Schema schema = record.valueSchema();
- Struct value = (Struct) record.value();
- if (schema.field(Envelope.FieldName.SOURCE) == null) {
- return;
- }
-
- Struct source = value.getStruct(Envelope.FieldName.SOURCE);
- if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
- return;
- }
-
- Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
- if (tsMs != null) {
- this.messageTimestamp = tsMs;
- }
- }
-
- private boolean isHeartbeatEvent(SourceRecord record) {
- String topic = record.topic();
- return topic != null && topic.startsWith(heartbeatTopicPrefix);
- }
-
- private boolean isSnapshotRecord(SourceRecord record) {
- Struct value = (Struct) record.value();
- if (value != null) {
- Struct source = value.getStruct(Envelope.FieldName.SOURCE);
- SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
- // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
- // we can still recover from checkpoint and continue to read the binlog,
- // because the checkpoint contains binlog position
- return SnapshotRecord.TRUE == snapshotRecord;
- }
- return false;
- }
-
- // ---------------------------------------------------------------------------------------
-
- private class DebeziumCollector implements Collector<T> {
-
- private final Queue<T> records = new ArrayDeque<>();
-
- @Override
- public void collect(T record) {
- records.add(record);
- }
-
- @Override
- public void close() {
- }
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
deleted file mode 100644
index a51aad8d4..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.flink.annotation.Internal;
-
-/**
- * The state that the Flink Debezium Consumer holds for each instance.
- *
- * <p>This class describes the most basic state that Debezium used for recovering based on Kafka
- * Connect mechanism. It includes a sourcePartition and sourceOffset.
- *
- * <p>The sourcePartition represents a single input sourcePartition that the record came from (e.g.
- * a filename, table name, or topic-partition). The sourceOffset represents a position in that
- * sourcePartition which can be used to resume consumption of data.
- *
- * <p>These values can have arbitrary structure and should be represented using
- * org.apache.kafka.connect.data objects (or primitive values). For example, a database connector
- * might specify the sourcePartition as a record containing { "db": "database_name", "table":
- * "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
- */
-@Internal
-public class DebeziumOffset implements Serializable {
- private static final long serialVersionUID = 1L;
-
- public Map<String, ?> sourcePartition;
- public Map<String, ?> sourceOffset;
-
- public void setSourcePartition(Map<String, ?> sourcePartition) {
- this.sourcePartition = sourcePartition;
- }
-
- public void setSourceOffset(Map<String, ?> sourceOffset) {
- this.sourceOffset = sourceOffset;
- }
-
- @Override
- public String toString() {
- return "DebeziumOffset{"
- + "sourcePartition="
- + sourcePartition
- + ", sourceOffset="
- + sourceOffset
- + '}';
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
deleted file mode 100644
index 9f193a8d9..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import java.io.IOException;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-/** Serializer implementation for a {@link DebeziumOffset}. */
-@Internal
-public class DebeziumOffsetSerializer {
- public static final DebeziumOffsetSerializer INSTANCE = new DebeziumOffsetSerializer();
-
- public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException {
- // we currently use JSON serialization for simplification, as the state is very small.
- // we can improve this in the future if needed
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.writeValueAsBytes(debeziumOffset);
- }
-
- public DebeziumOffset deserialize(byte[] bytes) throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readValue(bytes, DebeziumOffset.class);
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
deleted file mode 100644
index 77315db48..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-
-import io.debezium.config.Configuration;
-import io.debezium.relational.history.AbstractDatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
-import io.debezium.relational.history.DatabaseHistoryListener;
-import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.HistoryRecordComparator;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Consumer;
-
-/**
- * Inspired from {@link io.debezium.relational.history.MemoryDatabaseHistory} but we will store the
- * HistoryRecords in Flink's state for persistence.
- *
- * <p>Note: This is not a clean solution because we depends on a global variable and all the history
- * records will be stored in state (grow infinitely). We may need to come up with a
- * FileSystemDatabaseHistory in the future to store history in HDFS.
- */
-public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
-
- public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
-
- private ConcurrentLinkedQueue<SchemaRecord> schemaRecords;
- private String instanceName;
-
- /** Gets the registered HistoryRecords under the given instance name. */
- private ConcurrentLinkedQueue<SchemaRecord> getRegisteredHistoryRecord(String instanceName) {
- Collection<SchemaRecord> historyRecords = retrieveHistory(instanceName);
- return new ConcurrentLinkedQueue<>(historyRecords);
- }
-
- @Override
- public void configure(
- Configuration config,
- HistoryRecordComparator comparator,
- DatabaseHistoryListener listener,
- boolean useCatalogBeforeSchema) {
- super.configure(config, comparator, listener, useCatalogBeforeSchema);
- this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
- this.schemaRecords = getRegisteredHistoryRecord(instanceName);
-
- // register the schema changes into state
- // every change should be visible to the source function
- registerHistory(instanceName, schemaRecords);
- }
-
- @Override
- public void stop() {
- super.stop();
- removeHistory(instanceName);
- }
-
- @Override
- protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
- this.schemaRecords.add(new SchemaRecord(record));
- }
-
- @Override
- protected void recoverRecords(Consumer<HistoryRecord> records) {
- this.schemaRecords.stream().map(SchemaRecord::getHistoryRecord).forEach(records);
- }
-
- @Override
- public boolean exists() {
- return !schemaRecords.isEmpty();
- }
-
- @Override
- public boolean storageExists() {
- return true;
- }
-
- @Override
- public String toString() {
- return "Flink Database History";
- }
-
- /**
- * Determine whether the {@link FlinkDatabaseHistory} is compatible with the specified state.
- */
- public static boolean isCompatible(Collection<SchemaRecord> records) {
- for (SchemaRecord record : records) {
- // check the source/position/ddl is not null
- if (!record.isHistoryRecord()) {
- return false;
- } else {
- break;
- }
- }
- return true;
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
deleted file mode 100644
index 1027b18ed..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
-import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
-import static io.debezium.relational.history.TableChanges.TableChange;
-
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
-import io.debezium.config.Configuration;
-import io.debezium.relational.TableId;
-import io.debezium.relational.Tables;
-import io.debezium.relational.ddl.DdlParser;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
-import io.debezium.relational.history.DatabaseHistoryListener;
-import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.HistoryRecordComparator;
-import io.debezium.relational.history.TableChanges;
-import io.debezium.schema.DatabaseSchema;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * The {@link FlinkDatabaseSchemaHistory} only stores the latest schema of the monitored tables.
- * When recovering from the checkpoint, it should apply all the tables to the {@link
- * DatabaseSchema}, which doesn't need to replay the history anymore.
- *
- * <p>Considering the data structure maintained in the {@link FlinkDatabaseSchemaHistory} is much
- * different from the {@link FlinkDatabaseHistory}, it's not compatible with the {@link
- * FlinkDatabaseHistory}. Because it only maintains the latest schema of the table rather than all
- * history DDLs, it's useful to prevent OOM when meet massive history DDLs.
- */
-public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
-
- public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
-
- private final FlinkJsonTableChangeSerializer tableChangesSerializer =
- new FlinkJsonTableChangeSerializer();
-
- private ConcurrentMap<TableId, SchemaRecord> latestTables;
- private String instanceName;
- private DatabaseHistoryListener listener;
- private boolean storeOnlyMonitoredTablesDdl;
- private boolean skipUnparseableDDL;
- private boolean useCatalogBeforeSchema;
-
- @Override
- public void configure(
- Configuration config,
- HistoryRecordComparator comparator,
- DatabaseHistoryListener listener,
- boolean useCatalogBeforeSchema) {
- this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
- this.listener = listener;
- this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
- this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
- this.useCatalogBeforeSchema = useCatalogBeforeSchema;
-
- // recover
- this.latestTables = new ConcurrentHashMap<>();
- for (SchemaRecord schemaRecord : retrieveHistory(instanceName)) {
- // validate here
- TableChange tableChange =
- FlinkJsonTableChangeSerializer.fromDocument(
- schemaRecord.toDocument(), useCatalogBeforeSchema);
- latestTables.put(tableChange.getId(), schemaRecord);
- }
- // register
- registerHistory(instanceName, latestTables.values());
- }
-
- @Override
- public void start() {
- listener.started();
- }
-
- @Override
- public void record(
- Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl)
- throws DatabaseHistoryException {
- throw new UnsupportedOperationException(
- String.format(
- "The %s cannot work with 'debezium.internal.implementation' = 'legacy',"
- + "please use %s",
- FlinkDatabaseSchemaHistory.class.getCanonicalName(),
- FlinkDatabaseHistory.class.getCanonicalName()));
- }
-
- @Override
- public void record(
- Map<String, ?> source,
- Map<String, ?> position,
- String databaseName,
- String schemaName,
- String ddl,
- TableChanges changes)
- throws DatabaseHistoryException {
- for (TableChanges.TableChange change : changes) {
- switch (change.getType()) {
- case CREATE:
- case ALTER:
- latestTables.put(
- change.getId(),
- new SchemaRecord(tableChangesSerializer.toDocument(change)));
- break;
- case DROP:
- latestTables.remove(change.getId());
- break;
- default:
- // impossible
- throw new RuntimeException(
- String.format("Unknown change type: %s.", change.getType()));
- }
- }
- listener.onChangeApplied(
- new HistoryRecord(source, position, databaseName, schemaName, ddl, changes));
- }
-
- @Override
- public void recover(
- Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
- listener.recoveryStarted();
- for (SchemaRecord record : latestTables.values()) {
- TableChange tableChange =
- FlinkJsonTableChangeSerializer.fromDocument(
- record.getTableChangeDoc(), useCatalogBeforeSchema);
- schema.overwriteTable(tableChange.getTable());
- }
- listener.recoveryStopped();
- }
-
- @Override
- public void stop() {
- if (instanceName != null) {
- removeHistory(instanceName);
- }
- listener.stopped();
- }
-
- @Override
- public boolean exists() {
- return latestTables != null && !latestTables.isEmpty();
- }
-
- @Override
- public boolean storageExists() {
- return true;
- }
-
- @Override
- public void initializeStorage() {
- // do nothing
- }
-
- @Override
- public boolean storeOnlyMonitoredTables() {
- return storeOnlyMonitoredTablesDdl;
- }
-
- @Override
- public boolean skipUnparseableDdlStatements() {
- return skipUnparseableDDL;
- }
-
- /**
- * Determine whether the {@link FlinkDatabaseSchemaHistory} is compatible with the specified
- * state.
- */
- public static boolean isCompatible(Collection<SchemaRecord> records) {
- for (SchemaRecord record : records) {
- if (!record.isTableChangeRecord()) {
- return false;
- } else {
- break;
- }
- }
- return true;
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
deleted file mode 100644
index d36516c29..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
-import io.debezium.embedded.EmbeddedEngine;
-import io.debezium.engine.DebeziumEngine;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.kafka.common.utils.ThreadUtils;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.storage.OffsetStorageWriter;
-import org.apache.kafka.connect.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A implementation of {@link OffsetBackingStore} backed on Flink's state mechanism.
- *
- * <p>The {@link #OFFSET_STATE_VALUE} in the {@link WorkerConfig} is the raw position and offset
- * data in JSON format. It is set into the config when recovery from failover by {@link
- * DebeziumSourceFunction} before startup the {@link DebeziumEngine}. If it is not a restoration,
- * the {@link #OFFSET_STATE_VALUE} is empty. {@link DebeziumEngine} relies on the {@link
- * OffsetBackingStore} for failover recovery.
- *
- * @see DebeziumSourceFunction
- */
-public class FlinkOffsetBackingStore implements OffsetBackingStore {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkOffsetBackingStore.class);
-
- public static final String OFFSET_STATE_VALUE = "offset.storage.flink.state.value";
- public static final int FLUSH_TIMEOUT_SECONDS = 10;
-
- protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
- protected ExecutorService executor;
-
- @Override
- public void configure(WorkerConfig config) {
- // eagerly initialize the executor, because OffsetStorageWriter will use it later
- start();
-
- Map<String, ?> conf = config.originals();
- if (!conf.containsKey(OFFSET_STATE_VALUE)) {
- // a normal startup from clean state, not need to initialize the offset
- return;
- }
-
- String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
- DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
- DebeziumOffset debeziumOffset;
- try {
- debeziumOffset = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
- } catch (IOException e) {
- LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e);
- throw new RuntimeException(e);
- }
-
- final String engineName = (String) conf.get(EmbeddedEngine.ENGINE_NAME.name());
- Converter keyConverter = new JsonConverter();
- Converter valueConverter = new JsonConverter();
- keyConverter.configure(config.originals(), true);
- Map<String, Object> valueConfigs = new HashMap<>(conf);
- valueConfigs.put("schemas.enable", false);
- valueConverter.configure(valueConfigs, true);
- OffsetStorageWriter offsetWriter =
- new OffsetStorageWriter(
- this,
- // must use engineName as namespace to align with Debezium Engine
- // implementation
- engineName,
- keyConverter,
- valueConverter);
-
- offsetWriter.offset(debeziumOffset.sourcePartition, debeziumOffset.sourceOffset);
-
- // flush immediately
- if (!offsetWriter.beginFlush()) {
- // if nothing is needed to be flushed, there must be something wrong with the
- // initialization
- LOG.warn(
- "Initialize FlinkOffsetBackingStore from empty offset state, this shouldn't happen.");
- return;
- }
-
- // trigger flushing
- Future<Void> flushFuture =
- offsetWriter.doFlush(
- (error, result) -> {
- if (error != null) {
- LOG.error("Failed to flush initial offset.", error);
- } else {
- LOG.debug("Successfully flush initial offset.");
- }
- });
-
- // wait until flushing finished
- try {
- flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- LOG.info(
- "Flush offsets successfully, partition: {}, offsets: {}",
- debeziumOffset.sourcePartition,
- debeziumOffset.sourceOffset);
- } catch (InterruptedException e) {
- LOG.warn("Flush offsets interrupted, cancelling.", e);
- offsetWriter.cancelFlush();
- } catch (ExecutionException e) {
- LOG.error("Flush offsets threw an unexpected exception.", e);
- offsetWriter.cancelFlush();
- } catch (TimeoutException e) {
- LOG.error("Timed out waiting to flush offsets to storage.", e);
- offsetWriter.cancelFlush();
- }
- }
-
- @Override
- public void start() {
- if (executor == null) {
- executor =
- Executors.newFixedThreadPool(
- 1,
- ThreadUtils.createThreadFactory(
- this.getClass().getSimpleName() + "-%d", false));
- }
- }
-
- @Override
- public void stop() {
- if (executor != null) {
- executor.shutdown();
- // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.
- try {
- executor.awaitTermination(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (!executor.shutdownNow().isEmpty()) {
- throw new ConnectException(
- "Failed to stop FlinkOffsetBackingStore. Exiting without cleanly "
- + "shutting down pending tasks and/or callbacks.");
- }
- executor = null;
- }
- }
-
- @Override
- public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
- return executor.submit(
- () -> {
- Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
- for (ByteBuffer key : keys) {
- result.put(key, data.get(key));
- }
- return result;
- });
- }
-
- @Override
- public Future<Void> set(
- final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
- return executor.submit(
- () -> {
- for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
- data.put(entry.getKey(), entry.getValue());
- }
- if (callback != null) {
- callback.onCompletion(null, null);
- }
- return null;
- });
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
deleted file mode 100644
index 9af2ff246..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import io.debezium.engine.ChangeEvent;
-import java.io.Closeable;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The Handover is a utility to hand over data (a buffer of records) and exception from a
- * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a "size one
- * blocking queue", with some extras around exception reporting, closing, and waking up thread
- * without {@link Thread#interrupt() interrupting} threads.
- *
- * <p>This class is used in the Flink Debezium Engine Consumer to hand over data and exceptions
- * between the thread that runs the DebeziumEngine class and the main thread.
- *
- * <p>The Handover can also be "closed", signalling from one thread to the other that it the thread
- * has terminated.
- */
-@ThreadSafe
-@Internal
-public class Handover implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
- private final Object lock = new Object();
-
- @GuardedBy("lock")
- private List<ChangeEvent<SourceRecord, SourceRecord>> next;
-
- @GuardedBy("lock")
- private Throwable error;
-
- private boolean wakeupProducer;
-
- /**
- * Polls the next element from the Handover, possibly blocking until the next element is
- * available. This method behaves similar to polling from a blocking queue.
- *
- * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then that
- * exception is thrown rather than an element being returned.
- *
- * @return The next element (buffer of records, never null).
- * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
- * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
- */
- public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
- synchronized (lock) {
- while (next == null && error == null) {
- lock.wait();
- }
- List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
- if (n != null) {
- next = null;
- lock.notifyAll();
- return n;
- } else {
- ExceptionUtils.rethrowException(error, error.getMessage());
-
- // this statement cannot be reached since the above method always throws an
- // exception this is only here to silence the compiler and any warnings
- return Collections.emptyList();
- }
- }
- }
-
- /**
- * Hands over an element from the producer. If the Handover already has an element that was not
- * yet picked up by the consumer thread, this call blocks until the consumer picks up that
- * previous element.
- *
- * <p>This behavior is similar to a "size one" blocking queue.
- *
- * @param element The next element to hand over.
- * @throws InterruptedException Thrown, if the thread is interrupted while blocking for the
- * Handover to be empty.
- */
- public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
- throws InterruptedException {
-
- checkNotNull(element);
-
- synchronized (lock) {
- while (next != null && !wakeupProducer) {
- lock.wait();
- }
-
- wakeupProducer = false;
-
- // an error marks this as closed for the producer
- if (error != null) {
- ExceptionUtils.rethrow(error, error.getMessage());
- } else {
- // if there is no error, then this is open and can accept this element
- next = element;
- lock.notifyAll();
- }
- }
- }
-
- /**
- * Reports an exception. The consumer will throw the given exception immediately, if it is
- * currently blocked in the {@link #pollNext()} method, or the next time it calls that method.
- *
- * <p>After this method has been called, no call to either {@link #produce( List)} or {@link
- * #pollNext()} will ever return regularly any more, but will always return exceptionally.
- *
- * <p>If another exception was already reported, this method does nothing.
- *
- * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
- *
- * @param t The exception to report.
- */
- public void reportError(Throwable t) {
- checkNotNull(t);
-
- synchronized (lock) {
- LOG.error("Reporting error:", t);
- // do not override the initial exception
- if (error == null) {
- error = t;
- }
- next = null;
- lock.notifyAll();
- }
- }
-
- /**
- * Return whether there is an error.
- *
- * @return whether there is an error
- */
- public boolean hasError() {
- return error != null;
- }
-
- /**
- * Closes the handover. Both the {@link #produce(List)} method and the {@link #pollNext()} will
- * throw a {@link ClosedException} on any currently blocking and future invocations.
- *
- * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
- * that exception will not be overridden. The consumer thread will throw that exception upon
- * calling {@link #pollNext()}, rather than the {@code ClosedException}.
- */
- @Override
- public void close() {
- synchronized (lock) {
- next = null;
- wakeupProducer = false;
-
- if (error == null) {
- error = new ClosedException();
- }
- lock.notifyAll();
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * An exception thrown by the Handover in the {@link #pollNext()} or {@link #produce(List)}
- * method, after the Handover was closed via {@link #close()}.
- */
- public static final class ClosedException extends Exception {
-
- private static final long serialVersionUID = 1L;
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
deleted file mode 100644
index bcc48ce52..000000000
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
-
-import io.debezium.document.Document;
-import io.debezium.document.DocumentWriter;
-import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.TableChanges.TableChange;
-import java.io.IOException;
-import javax.annotation.Nullable;
-
-/**
- * The Record represents a schema change event, it contains either one {@link HistoryRecord} or
- * {@link TableChange}.
- *
- * <p>The {@link HistoryRecord} will be used by {@link FlinkDatabaseHistory} which keeps full
- * history of table change events for all tables, the {@link TableChange} will be used by {@link
- * FlinkDatabaseSchemaHistory} which keeps the latest table change for each table.
- */
-public class SchemaRecord {
-
- @Nullable private final HistoryRecord historyRecord;
-
- @Nullable private final Document tableChangeDoc;
-
- public SchemaRecord(HistoryRecord historyRecord) {
- this.historyRecord = historyRecord;
- this.tableChangeDoc = null;
- }
-
- public SchemaRecord(Document document) {
- if (isHistoryRecordDocument(document)) {
- this.historyRecord = new HistoryRecord(document);
- this.tableChangeDoc = null;
- } else {
- this.tableChangeDoc = document;
- this.historyRecord = null;
- }
- }
-
- @Nullable
- public HistoryRecord getHistoryRecord() {
- return historyRecord;
- }
-
- @Nullable
- public Document getTableChangeDoc() {
- return tableChangeDoc;
- }
-
- public boolean isHistoryRecord() {
- return historyRecord != null;
- }
-
- public boolean isTableChangeRecord() {
- return tableChangeDoc != null;
- }
-
- public Document toDocument() {
- if (historyRecord != null) {
- return historyRecord.document();
- } else {
- return tableChangeDoc;
- }
- }
-
- @Override
- public String toString() {
- try {
- return DocumentWriter.defaultWriter().write(toDocument());
- } catch (IOException e) {
- return super.toString();
- }
- }
-
- private boolean isHistoryRecordDocument(Document document) {
- return new HistoryRecord(document).isValid();
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
index b00d12024..958638234 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
@@ -18,8 +18,8 @@
package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
-import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
import io.debezium.relational.history.DatabaseHistory;
import java.util.Collection;
import java.util.Collections;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/ObjectUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/ObjectUtils.java
new file mode 100644
index 000000000..5f4e2732f
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/ObjectUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/** Utilities for operation on {@link Object}. */
+public class ObjectUtils {
+
+ /**
+ * Returns a number {@code Object} whose value is {@code (number + augend)}, Note: This method
+ * does not consider number overflow because we don't want to change the object type.
+ */
+ public static Object plus(Object number, int augend) {
+ if (number instanceof Integer) {
+ return (int) number + augend;
+ } else if (number instanceof Long) {
+ return (long) number + augend;
+ } else if (number instanceof BigInteger) {
+ return ((BigInteger) number).add(BigInteger.valueOf(augend));
+ } else if (number instanceof BigDecimal) {
+ return ((BigDecimal) number).add(BigDecimal.valueOf(augend));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported type %s for numeric plus.",
+ number.getClass().getSimpleName()));
+ }
+ }
+
+ /** Returns the difference {@code BigDecimal} whose value is {@code (minuend - subtrahend)}. */
+ public static BigDecimal minus(Object minuend, Object subtrahend) {
+ if (!minuend.getClass().equals(subtrahend.getClass())) {
+ throw new IllegalStateException(
+ String.format(
+ "Unsupported operand type, the minuend type %s is different with subtrahend type %s.",
+ minuend.getClass().getSimpleName(),
+ subtrahend.getClass().getSimpleName()));
+ }
+ if (minuend instanceof Integer) {
+ return BigDecimal.valueOf((int) minuend).subtract(BigDecimal.valueOf((int) subtrahend));
+ } else if (minuend instanceof Long) {
+ return BigDecimal.valueOf((long) minuend)
+ .subtract(BigDecimal.valueOf((long) subtrahend));
+ } else if (minuend instanceof BigInteger) {
+ return new BigDecimal(
+ ((BigInteger) minuend).subtract((BigInteger) subtrahend).toString());
+ } else if (minuend instanceof BigDecimal) {
+ return ((BigDecimal) minuend).subtract((BigDecimal) subtrahend);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported type %s for numeric minus.",
+ minuend.getClass().getSimpleName()));
+ }
+ }
+
+ /**
+ * Compares two comparable objects.
+ *
+ * @return The value {@code 0} if {@code num1} is equal to the {@code num2}; a value less than
+ * {@code 0} if the {@code num1} is numerically less than the {@code num2}; and a value
+ * greater than {@code 0} if the {@code num1} is numerically greater than the {@code num2}.
+ * @throws ClassCastException if the compared objects are not instance of {@link Comparable} or
+ * not <i>mutually comparable</i> (for example, strings and integers).
+ */
+ @SuppressWarnings("unchecked")
+ public static int compare(Object obj1, Object obj2) {
+ if (obj1 instanceof Comparable && obj1.getClass().equals(obj2.getClass())) {
+ return ((Comparable) obj1).compareTo(obj2);
+ } else {
+ return obj1.toString().compareTo(obj2.toString());
+ }
+ }
+
+ /**
+ * Compares two Double numeric object.
+ *
+ * @return -1, 0, or 1 as this {@code arg1} is numerically less than, equal to, or greater than
+ * {@code arg2}.
+ */
+ public static int doubleCompare(double arg1, double arg2) {
+ BigDecimal bigDecimal1 = BigDecimal.valueOf(arg1);
+ BigDecimal bigDecimal2 = BigDecimal.valueOf(arg2);
+ return bigDecimal1.compareTo(bigDecimal2);
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/DebeziumUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/DebeziumUtils.java
new file mode 100644
index 000000000..008bbdd51
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/DebeziumUtils.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.connector.mysql.MySqlDatabaseSchema;
+import io.debezium.connector.mysql.MySqlTopicSelector;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection.JdbcConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities related to Debezium.
+ */
+public class DebeziumUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);
+
+ /**
+ * Creates and opens a new {@link JdbcConnection} backing connection pool.
+ */
+ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) {
+ JdbcConnection jdbc =
+ new JdbcConnection(
+ sourceConfig.getDbzConfiguration(),
+ new JdbcConnectionFactory(sourceConfig));
+ try {
+ jdbc.connect();
+ } catch (Exception e) {
+ LOG.error("Failed to open MySQL connection", e);
+ throw new FlinkRuntimeException(e);
+ }
+ return jdbc;
+ }
+
+ /**
+ * Creates a new {@link MySqlConnection}, but not open the connection.
+ */
+ public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConfig) {
+ return new MySqlConnection(
+ new MySqlConnection.MySqlConnectionConfiguration(sourceConfig.getDbzConfiguration()));
+ }
+
+ /**
+ * Creates a new {@link MySqlConnection}, but not open the connection.
+ */
+ public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) {
+ return new MySqlConnection(
+ new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
+ }
+
+ /**
+ * Creates a new {@link BinaryLogClient} for consuming mysql binlog.
+ */
+ public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) {
+ final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration);
+ return new BinaryLogClient(
+ connectorConfig.hostname(),
+ connectorConfig.port(),
+ connectorConfig.username(),
+ connectorConfig.password());
+ }
+
+ /**
+ * Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas.
+ */
+ public static MySqlDatabaseSchema createMySqlDatabaseSchema(
+ MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
+ TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(dbzMySqlConfig);
+ SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
+ MySqlValueConverters valueConverters = getValueConverters(dbzMySqlConfig);
+ return new MySqlDatabaseSchema(
+ dbzMySqlConfig,
+ valueConverters,
+ topicSelector,
+ schemaNameAdjuster,
+ isTableIdCaseSensitive);
+ }
+
+ /**
+ * Fetch current binlog offsets in MySql Server.
+ */
+ public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
+ final String showMasterStmt = "SHOW MASTER STATUS";
+ try {
+ return jdbc.queryAndMap(
+ showMasterStmt,
+ rs -> {
+ if (rs.next()) {
+ final String binlogFilename = rs.getString(1);
+ final long binlogPosition = rs.getLong(2);
+ final String gtidSet =
+ rs.getMetaData().getColumnCount() > 4 ? rs.getString(5) : null;
+ return new BinlogOffset(
+ binlogFilename, binlogPosition, 0L, 0, 0, gtidSet, null);
+ } else {
+ throw new FlinkRuntimeException(
+ "Cannot read the binlog filename and position via '"
+ + showMasterStmt
+ + "'. Make sure your server is correctly configured");
+ }
+ });
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(
+ "Cannot read the binlog filename and position via '"
+ + showMasterStmt
+ + "'. Make sure your server is correctly configured",
+ e);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static MySqlValueConverters getValueConverters(MySqlConnectorConfig dbzMySqlConfig) {
+ TemporalPrecisionMode timePrecisionMode = dbzMySqlConfig.getTemporalPrecisionMode();
+ JdbcValueConverters.DecimalMode decimalMode = dbzMySqlConfig.getDecimalMode();
+ String bigIntUnsignedHandlingModeStr =
+ dbzMySqlConfig
+ .getConfig()
+ .getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode =
+ MySqlConnectorConfig.BigIntUnsignedHandlingMode.parse(
+ bigIntUnsignedHandlingModeStr);
+ JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode =
+ bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
+
+ boolean timeAdjusterEnabled =
+ dbzMySqlConfig.getConfig().getBoolean(MySqlConnectorConfig.ENABLE_TIME_ADJUSTER);
+ return new MySqlValueConverters(
+ decimalMode,
+ timePrecisionMode,
+ bigIntUnsignedMode,
+ dbzMySqlConfig.binaryHandlingMode(),
+ timeAdjusterEnabled ? MySqlValueConverters::adjustTemporal : x -> x,
+ MySqlValueConverters::defaultParsingErrorHandler);
+ }
+
+ public static Map<String, String> readMySqlSystemVariables(JdbcConnection connection) {
+ // Read the system variables from the MySQL instance and get the current database name ...
+ return querySystemVariables(connection, "SHOW VARIABLES");
+ }
+
+ private static Map<String, String> querySystemVariables(
+ JdbcConnection connection, String statement) {
+ final Map<String, String> variables = new HashMap<>();
+ try {
+ connection.query(
+ statement,
+ rs -> {
+ while (rs.next()) {
+ String varName = rs.getString(1);
+ String value = rs.getString(2);
+ if (varName != null && value != null) {
+ variables.put(varName, value);
+ }
+ }
+ });
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException("Error reading MySQL variables: " + e.getMessage(), e);
+ }
+
+ return variables;
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlSource.java
new file mode 100644
index 000000000..adbdb0132
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlSource.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.table.StartupOptions;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+import io.debezium.connector.mysql.MySqlConnector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_KEY;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction.LEGACY_IMPLEMENTATION_VALUE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Deprecated
+public class MySqlSource {
+
+ private static final String DATABASE_SERVER_NAME = "mysql_binlog_source";
+
+ public static <T> Builder<T> builder() {
+ return new Builder<T>();
+ }
+
+ @Deprecated
+ public static class Builder<T> {
+
+ private int port = 3306; // default 3306 port
+ private String hostname;
+ private String[] databaseList;
+ private String username;
+ private String password;
+ private Integer serverId;
+ private String serverTimeZone;
+ private String[] tableList;
+ private Properties dbzProperties;
+ private StartupOptions startupOptions = StartupOptions.initial();
+ private DebeziumDeserializationSchema<RowData> deserializer;
+
+ public Builder<T> hostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ /** Integer port number of the MySQL database server. */
+ public Builder<T> port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match database names to be monitored; any
+ * database name not included in the whitelist will be excluded from monitoring. By default
+ * all databases will be monitored.
+ */
+ public Builder<T> databaseList(String... databaseList) {
+ this.databaseList = databaseList;
+ return this;
+ }
+
+ /**
+ * An optional list of regular expressions that match fully-qualified table identifiers for
+ * tables to be monitored; any table not included in the list will be excluded from
+ * monitoring. Each identifier is of the form databaseName.tableName. By default the
+ * connector will monitor every non-system table in each monitored database.
+ */
+ public Builder<T> tableList(String... tableList) {
+ this.tableList = tableList;
+ return this;
+ }
+
+ /** Name of the MySQL database to use when connecting to the MySQL database server. */
+ public Builder<T> username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /** Password to use when connecting to the MySQL database server. */
+ public Builder<T> password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * The session time zone in database server, e.g. "America/Los_Angeles". It controls how the
+ * TIMESTAMP type in MYSQL converted to STRING. See more
+ * https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types
+ */
+ public Builder<T> serverTimeZone(String timeZone) {
+ this.serverTimeZone = timeZone;
+ return this;
+ }
+
+ /**
+ * A numeric ID of this database client, which must be unique across all currently-running
+ * database processes in the MySQL cluster. This connector joins the MySQL database cluster
+ * as another server (with this unique ID) so it can read the binlog. By default, a random
+ * number is generated between 5400 and 6400, though we recommend setting an explicit value.
+ */
+ public Builder<T> serverId(int serverId) {
+ this.serverId = serverId;
+ return this;
+ }
+
+ /** The Debezium MySQL connector properties. For example, "snapshot.mode". */
+ public Builder<T> debeziumProperties(Properties properties) {
+ this.dbzProperties = properties;
+ return this;
+ }
+
+ /**
+ * The deserializer used to convert from consumed {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ */
+ public Builder<T> deserializer(DebeziumDeserializationSchema<RowData> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ /** Specifies the startup options. */
+ public Builder<T> startupOptions(StartupOptions startupOptions) {
+ this.startupOptions = startupOptions;
+ return this;
+ }
+
+ public DebeziumSourceFunction<RowData> build() {
+ Properties props = new Properties();
+ props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
+ // hard code server name, because we don't need to distinguish it, docs:
+ // Logical name that identifies and provides a namespace for the particular MySQL
+ // database
+ // server/cluster being monitored. The logical name should be unique across all other
+ // connectors,
+ // since it is used as a prefix for all Kafka topic names emanating from this connector.
+ // Only alphanumeric characters and underscores should be used.
+ props.setProperty("database.server.name", DATABASE_SERVER_NAME);
+ props.setProperty("database.hostname", checkNotNull(hostname));
+ props.setProperty("database.user", checkNotNull(username));
+ props.setProperty("database.password", checkNotNull(password));
+ props.setProperty("database.port", String.valueOf(port));
+ props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
+ // debezium use "long" mode to handle unsigned bigint by default,
+ // but it'll cause lose of precise when the value is larger than 2^63,
+ // so use "precise" mode to avoid it.
+ props.put("bigint.unsigned.handling.mode", "precise");
+
+ if (serverId != null) {
+ props.setProperty("database.server.id", String.valueOf(serverId));
+ }
+ if (databaseList != null) {
+ props.setProperty("database.whitelist", String.join(",", databaseList));
+ }
+ if (tableList != null) {
+ props.setProperty("table.whitelist", String.join(",", tableList));
+ }
+ if (serverTimeZone != null) {
+ props.setProperty("database.serverTimezone", serverTimeZone);
+ }
+
+ DebeziumOffset specificOffset = null;
+ switch (startupOptions.startupMode) {
+ case INITIAL:
+ props.setProperty("snapshot.mode", "initial");
+ break;
+
+ case EARLIEST_OFFSET:
+ props.setProperty("snapshot.mode", "never");
+ break;
+
+ case LATEST_OFFSET:
+ props.setProperty("snapshot.mode", "schema_only");
+ break;
+
+ case SPECIFIC_OFFSETS:
+ // if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must
+ // be configured. It only snapshots the schemas, not the data,
+ // and continue binlog reading from the specified offset
+ props.setProperty("snapshot.mode", "schema_only_recovery");
+
+ specificOffset = new DebeziumOffset();
+ Map<String, String> sourcePartition = new HashMap<>();
+ sourcePartition.put("server", DATABASE_SERVER_NAME);
+ specificOffset.setSourcePartition(sourcePartition);
+
+ Map<String, Object> sourceOffset = new HashMap<>();
+ sourceOffset.put("file", startupOptions.specificOffsetFile);
+ sourceOffset.put("pos", startupOptions.specificOffsetPos);
+ specificOffset.setSourceOffset(sourceOffset);
+ break;
+
+ case TIMESTAMP:
+ checkNotNull(deserializer);
+ props.setProperty("snapshot.mode", "never");
+ deserializer =
+ new SeekBinlogToTimestampFilter<>(
+ startupOptions.startupTimestampMillis, deserializer);
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ if (dbzProperties != null) {
+ props.putAll(dbzProperties);
+ // Add default configurations for compatibility when set the legacy mysql connector
+ // implementation
+ if (LEGACY_IMPLEMENTATION_VALUE.equals(
+ dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {
+ props.put("transforms", "snapshotasinsert");
+ props.put(
+ "transforms.snapshotasinsert.type",
+ "io.debezium.connector.mysql.transforms.ReadToInsertEvent");
+ }
+ }
+
+ return new DebeziumSourceFunction<>(
+ deserializer, props, specificOffset, new MySqlValidator(props));
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlValidator.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlValidator.java
new file mode 100644
index 000000000..eb7cea3ba
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/MySqlValidator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import io.debezium.config.Configuration;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.Validator;
+import io.debezium.jdbc.JdbcConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * The validator for MySql: it only cares about the version of the database is larger than or equal
+ * to 5.7. It also requires the binlog format in the database is ROW and row image is FULL.
+ */
+public class MySqlValidator implements Validator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MySqlValidator.class);
+ private static final long serialVersionUID = 1L;
+
+ private static final String BINLOG_FORMAT_ROW = "ROW";
+ private static final String BINLOG_FORMAT_IMAGE_FULL = "FULL";
+
+ private final Properties dbzProperties;
+ private final MySqlSourceConfig sourceConfig;
+
+ public MySqlValidator(Properties dbzProperties) {
+ this.dbzProperties = dbzProperties;
+ this.sourceConfig = null;
+ }
+
+ public MySqlValidator(MySqlSourceConfig sourceConfig) {
+ this.dbzProperties = sourceConfig.getDbzProperties();
+ this.sourceConfig = sourceConfig;
+ }
+
+ @Override
+ public void validate() {
+ JdbcConnection connection = null;
+ try {
+ if (sourceConfig != null) {
+ connection = DebeziumUtils.openJdbcConnection(sourceConfig);
+ } else {
+ // for the legacy source
+ connection = DebeziumUtils.createMySqlConnection(Configuration.from(dbzProperties));
+ }
+ checkVersion(connection);
+ checkBinlogFormat(connection);
+ checkBinlogRowImage(connection);
+ } catch (SQLException ex) {
+ throw new TableException(
+ "Unexpected error while connecting to MySQL and validating", ex);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException("Closing connection error", e);
+ }
+ }
+ }
+ LOG.info("MySQL validation passed.");
+ }
+
+ private void checkVersion(JdbcConnection connection) throws SQLException {
+ String version =
+ connection.queryAndMap("SELECT VERSION()", rs -> rs.next() ? rs.getString(1) : "");
+
+ // Only care about the major version and minor version
+ Integer[] versionNumbers =
+ Arrays.stream(version.split("\\."))
+ .limit(2)
+ .map(Integer::new)
+ .toArray(Integer[]::new);
+ boolean isSatisfied;
+ if (versionNumbers[0] > 5) {
+ isSatisfied = true;
+ } else if (versionNumbers[0] < 5) {
+ isSatisfied = false;
+ } else {
+ isSatisfied = versionNumbers[1] >= 6;
+ }
+ if (!isSatisfied) {
+ throw new ValidationException(
+ String.format(
+ "Currently Flink MySql CDC connector only supports MySql "
+ + "whose version is larger or equal to 5.6, but actual is %s.%s.",
+ versionNumbers[0], versionNumbers[1]));
+ }
+ }
+
+ /** Check whether the binlog format is ROW. */
+ private void checkBinlogFormat(JdbcConnection connection) throws SQLException {
+ String mode =
+ connection
+ .queryAndMap(
+ "SHOW GLOBAL VARIABLES LIKE 'binlog_format'",
+ rs -> rs.next() ? rs.getString(2) : "")
+ .toUpperCase();
+ if (!BINLOG_FORMAT_ROW.equals(mode)) {
+ throw new ValidationException(
+ String.format(
+ "The MySQL server is configured with binlog_format %s rather than %s, which is "
+ + "required for this connector to work properly. "
+ + "Change the MySQL configuration to use a "
+ + "binlog_format=ROW and restart the connector.",
+ mode, BINLOG_FORMAT_ROW));
+ }
+ }
+
+ /** Check whether the binlog row image is FULL. */
+ private void checkBinlogRowImage(JdbcConnection connection) throws SQLException {
+ String rowImage =
+ connection
+ .queryAndMap(
+ "SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'",
+ rs -> {
+ if (rs.next()) {
+ return rs.getString(2);
+ }
+ // This setting was introduced in MySQL 5.6+ with default of
+ // 'FULL'.
+ // For older versions, assume 'FULL'.
+ return BINLOG_FORMAT_IMAGE_FULL;
+ })
+ .toUpperCase();
+ if (!rowImage.equals(BINLOG_FORMAT_IMAGE_FULL)) {
+ throw new ValidationException(
+ String.format(
+ "The MySQL server is configured with binlog_row_image %s rather than %s, which is "
+ + "required for this connector to work properly. "
+ + "Change the MySQL configuration to use a "
+ + "binlog_row_image=FULL and restart the connector.",
+ rowImage, BINLOG_FORMAT_IMAGE_FULL));
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/SeekBinlogToTimestampFilter.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/SeekBinlogToTimestampFilter.java
new file mode 100644
index 000000000..a2613510b
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/SeekBinlogToTimestampFilter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema}
+ * to seek binlog to the specific timestamp.
+ */
+public class SeekBinlogToTimestampFilter<T> implements DebeziumDeserializationSchema<T> {
+ private static final long serialVersionUID = -4450118969976653497L;
+ protected static final Logger LOG = LoggerFactory.getLogger(SeekBinlogToTimestampFilter.class);
+
+ private final long startupTimestampMillis;
+ private final DebeziumDeserializationSchema<T> serializer;
+
+ private transient boolean find = false;
+ private transient long filtered = 0L;
+
+ public SeekBinlogToTimestampFilter(
+ long startupTimestampMillis, DebeziumDeserializationSchema<T> serializer) {
+ this.startupTimestampMillis = startupTimestampMillis;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+ if (find) {
+ serializer.deserialize(record, out);
+ return;
+ }
+
+ if (filtered == 0) {
+ LOG.info("Begin to seek binlog to the specific timestamp {}.", startupTimestampMillis);
+ }
+
+ Struct value = (Struct) record.value();
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ Long ts = source.getInt64(Envelope.FieldName.TIMESTAMP);
+ if (ts != null && ts >= startupTimestampMillis) {
+ serializer.deserialize(record, out);
+ find = true;
+ LOG.info(
+ "Successfully seek to the specific timestamp {} with filtered {} change events.",
+ startupTimestampMillis,
+ filtered);
+ } else {
+ filtered++;
+ if (filtered % 10000 == 0) {
+ LOG.info(
+ "Seeking binlog to specific timestamp with filtered {} change events.",
+ filtered);
+ }
+ }
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<T> out, TableChange tableChange)
+ throws Exception {
+
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return serializer.getProducedType();
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceConfig.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceConfig.java
new file mode 100644
index 000000000..835778fed
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceConfig.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.config;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.relational.RelationalTableFilters;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.table.StartupOptions;
+
+public class MySqlSourceConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String hostname;
+ private final int port;
+ private final String username;
+ private final String password;
+ private final List<String> databaseList;
+ private final List<String> tableList;
+ @Nullable private final ServerIdRange serverIdRange;
+ private final StartupOptions startupOptions;
+ private final int splitSize;
+ private final int splitMetaGroupSize;
+ private final int fetchSize;
+ private final String serverTimeZone;
+ private final Duration connectTimeout;
+ private final int connectMaxRetries;
+ private final int connectionPoolSize;
+ private final double distributionFactorUpper;
+ private final double distributionFactorLower;
+ private final boolean includeSchemaChanges;
+ private final boolean scanNewlyAddedTableEnabled;
+ private final Properties jdbcProperties;
+
+ // --------------------------------------------------------------------------------------------
+ // Debezium Configurations
+ // --------------------------------------------------------------------------------------------
+ private final Properties dbzProperties;
+ private final Configuration dbzConfiguration;
+ private final MySqlConnectorConfig dbzMySqlConfig;
+
+ MySqlSourceConfig(
+ String hostname,
+ int port,
+ String username,
+ String password,
+ List<String> databaseList,
+ List<String> tableList,
+ @Nullable ServerIdRange serverIdRange,
+ StartupOptions startupOptions,
+ int splitSize,
+ int splitMetaGroupSize,
+ int fetchSize,
+ String serverTimeZone,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ boolean includeSchemaChanges,
+ boolean scanNewlyAddedTableEnabled,
+ Properties dbzProperties,
+ Properties jdbcProperties) {
+ this.hostname = checkNotNull(hostname);
+ this.port = port;
+ this.username = checkNotNull(username);
+ this.password = password;
+ this.databaseList = checkNotNull(databaseList);
+ this.tableList = checkNotNull(tableList);
+ this.serverIdRange = serverIdRange;
+ this.startupOptions = checkNotNull(startupOptions);
+ this.splitSize = splitSize;
+ this.splitMetaGroupSize = splitMetaGroupSize;
+ this.fetchSize = fetchSize;
+ this.serverTimeZone = checkNotNull(serverTimeZone);
+ this.connectTimeout = checkNotNull(connectTimeout);
+ this.connectMaxRetries = connectMaxRetries;
+ this.connectionPoolSize = connectionPoolSize;
+ this.distributionFactorUpper = distributionFactorUpper;
+ this.distributionFactorLower = distributionFactorLower;
+ this.includeSchemaChanges = includeSchemaChanges;
+ this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
+ this.dbzProperties = checkNotNull(dbzProperties);
+ this.dbzConfiguration = Configuration.from(dbzProperties);
+ this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
+ this.jdbcProperties = jdbcProperties;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public List<String> getDatabaseList() {
+ return databaseList;
+ }
+
+ public List<String> getTableList() {
+ return tableList;
+ }
+
+ @Nullable
+ public ServerIdRange getServerIdRange() {
+ return serverIdRange;
+ }
+
+ public StartupOptions getStartupOptions() {
+ return startupOptions;
+ }
+
+ public int getSplitSize() {
+ return splitSize;
+ }
+
+ public int getSplitMetaGroupSize() {
+ return splitMetaGroupSize;
+ }
+
+ public double getDistributionFactorUpper() {
+ return distributionFactorUpper;
+ }
+
+ public double getDistributionFactorLower() {
+ return distributionFactorLower;
+ }
+
+ public int getFetchSize() {
+ return fetchSize;
+ }
+
+ public String getServerTimeZone() {
+ return serverTimeZone;
+ }
+
+ public Duration getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public int getConnectMaxRetries() {
+ return connectMaxRetries;
+ }
+
+ public int getConnectionPoolSize() {
+ return connectionPoolSize;
+ }
+
+ public boolean isIncludeSchemaChanges() {
+ return includeSchemaChanges;
+ }
+
+ public boolean isScanNewlyAddedTableEnabled() {
+ return scanNewlyAddedTableEnabled;
+ }
+
+ public Properties getDbzProperties() {
+ return dbzProperties;
+ }
+
+ public Configuration getDbzConfiguration() {
+ return dbzConfiguration;
+ }
+
+ public MySqlConnectorConfig getMySqlConnectorConfig() {
+ return dbzMySqlConfig;
+ }
+
+ public RelationalTableFilters getTableFilters() {
+ return dbzMySqlConfig.getTableFilters();
+ }
+
+ public Properties getJdbcProperties() {
+ return jdbcProperties;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceOptions.java
new file mode 100644
index 000000000..9885cf74f
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/MySqlSourceOptions.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.config;
+
+import java.time.Duration;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class MySqlSourceOptions {
+
+ public static final ConfigOption<String> HOSTNAME =
+ ConfigOptions.key("hostname")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("IP address or hostname of the MySQL database server.");
+
+ public static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port")
+ .intType()
+ .defaultValue(3306)
+ .withDescription("Integer port number of the MySQL database server.");
+
+ public static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the MySQL database to use when connecting to the MySQL database server.");
+
+ public static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Password to use when connecting to the MySQL database server.");
+
+ public static final ConfigOption<String> DATABASE_NAME =
+ ConfigOptions.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the MySQL server to monitor.");
+
+ public static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the MySQL database to monitor.");
+
+ public static final ConfigOption<String> SERVER_TIME_ZONE =
+ ConfigOptions.key("server-time-zone")
+ .stringType()
+ .defaultValue("UTC")
+ .withDescription("The session time zone in database server.");
+
+ public static final ConfigOption<String> SERVER_ID =
+ ConfigOptions.key("server-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "A numeric ID or a numeric ID range of this database client, "
+ + "The numeric ID syntax is like '5400', the numeric ID range syntax "
+ + "is like '5400-5408', The numeric ID range syntax is recommended when "
+ + "'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all "
+ + "currently-running database processes in the MySQL cluster. This connector"
+ + " joins the MySQL cluster as another server (with this unique ID) "
+ + "so it can read the binlog. By default, a random number is generated between"
+ + " 5400 and 6400, though we recommend setting an explicit value.");
+
+ public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+ ConfigOptions.key("scan.incremental.snapshot.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Incremental snapshot is a new mechanism to read snapshot of a table. "
+ + "Compared to the old snapshot mechanism, the incremental "
+ + "snapshot has many advantages, including:\n"
+ + "(1) source can be parallel during snapshot reading, \n"
+ + "(2) source can perform checkpoints in the chunk "
+ + "granularity during snapshot reading, \n"
+ + "(3) source doesn't need to acquire global read lock "
+ + "(FLUSH TABLES WITH READ LOCK) before snapshot reading.\n"
+ + "If you would like the source run in parallel, each parallel "
+ + "reader should have an unique server id, "
+ + "so the 'server-id' must be a range like '5400-6400', "
+ + "and the range must be larger than the parallelism.");
+
+ public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+ ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+ .intType()
+ .defaultValue(8096)
+ .withDescription(
+ "The chunk size (number of rows) of table snapshot, "
+ + "captured tables are split into multiple "
+ + "chunks when read the snapshot of table.");
+
+ public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+ ConfigOptions.key("scan.snapshot.fetch.size")
+ .intType()
+ .defaultValue(1024)
+ .withDescription(
+ "The maximum fetch size for per poll when read table snapshot.");
+
+ public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+ ConfigOptions.key("connect.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "The maximum time that the connector should wait after "
+ + "trying to connect to the MySQL database server before timing out.");
+
+ public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+ ConfigOptions.key("connection.pool.size")
+ .intType()
+ .defaultValue(20)
+ .withDescription("The connection pool size.");
+
+ public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+ ConfigOptions.key("connect.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "The max retry times that the connector should retry to build "
+ + "MySQL database server connection.");
+
+ public static final ConfigOption<String> SCAN_STARTUP_MODE =
+ ConfigOptions.key("scan.startup.mode")
+ .stringType()
+ .defaultValue("initial")
+ .withDescription(
+ "Optional startup mode for MySQL CDC consumer, valid "
+ + "enumerations are "
+ + "\"initial\", \"earliest-offset\", "
+ + "\"latest-offset\", \"timestamp\"\n"
+ + "or \"specific-offset\"");
+
+ public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE =
+ ConfigOptions.key("scan.startup.specific-offset.file")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional offsets used in case of \"specific-offset\" startup mode");
+
+ public static final ConfigOption<Integer> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+ ConfigOptions.key("scan.startup.specific-offset.pos")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional offsets used in case of \"specific-offset\" startup mode");
+
+ public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+ ConfigOptions.key("scan.startup.timestamp-millis")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional timestamp used in case of \"timestamp\" startup mode");
+
+ public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+ ConfigOptions.key("heartbeat.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "Optional interval of sending heartbeat event for tracing the "
+ + "latest available binlog offsets");
+
+ public static final ConfigOption<Boolean> APPEND_MODE =
+ ConfigOptions.key("append-mode")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether works as append source.");
+
+ // ----------------------------------------------------------------------------
+ // experimental options, won't add them to documentation
+ // ----------------------------------------------------------------------------
+ @Experimental
+ public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+ ConfigOptions.key("chunk-meta.group.size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "The group size of chunk meta, if the meta size exceeds the "
+ + "group size, the meta will be will be divided into multiple groups.");
+
+ @Experimental
+ public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+ ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
+ .doubleType()
+ .defaultValue(1000.0d)
+ .withDescription(
+ "The upper bound of split key distribution factor. The distribution "
+ + "factor is used to determine whether the"
+ + " table is evenly distribution or not."
+ + " The table chunks would use evenly calculation optimization "
+ + "when the data distribution is even,"
+ + " and the query MySQL for splitting would happen when it is uneven."
+ + " The distribution factor could be calculated by (MAX(id) - "
+ + "MIN(id) + 1) / rowCount.");
+
+ @Experimental
+ public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+ ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
+ .doubleType()
+ .defaultValue(0.05d)
+ .withDescription(
+ "The lower bound of split key distribution factor. The distribution "
+ + "factor is used to determine whether the"
+ + " table is evenly distribution or not."
+ + " The table chunks would use evenly calculation optimization "
+ + "when the data distribution is even,"
+ + " and the query MySQL for splitting would happen when it is uneven."
+ + " The distribution factor could be calculated by (MAX(id) - "
+ + "MIN(id) + 1) / rowCount.");
+
+ @Experimental
+ public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
+ ConfigOptions.key("scan.newly-added-table.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether capture the scan the newly added tables or not, by default is false.");
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/ServerIdRange.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/ServerIdRange.java
new file mode 100644
index 000000000..aff76f2aa
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/config/ServerIdRange.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.config;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import javax.annotation.Nullable;
+
+/**
+ * This class defines a range of server id. The boundaries of the range are inclusive.
+ *
+ * @see MySqlSourceOptions#SERVER_ID
+ */
+public class ServerIdRange implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /** Start of the range (inclusive). */
+ private final int startServerId;
+
+ /** End of the range (inclusive). */
+ private final int endServerId;
+
+ public ServerIdRange(int startServerId, int endServerId) {
+ this.startServerId = startServerId;
+ this.endServerId = endServerId;
+ }
+
+ public int getStartServerId() {
+ return startServerId;
+ }
+
+ public int getEndServerId() {
+ return endServerId;
+ }
+
+ public int getServerId(int subTaskId) {
+ checkArgument(subTaskId >= 0, "Subtask ID %s shouldn't be a negative number.", subTaskId);
+ if (subTaskId > getNumberOfServerIds()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Subtask ID %s is out of server id range %s, "
+ + "please adjust the server id range to "
+ + "make the number of server id larger than "
+ + "the source parallelism.",
+ subTaskId, this.toString()));
+ }
+ return startServerId + subTaskId;
+ }
+
+ public int getNumberOfServerIds() {
+ return endServerId - startServerId + 1;
+ }
+
+ @Override
+ public String toString() {
+ if (startServerId == endServerId) {
+ return String.valueOf(startServerId);
+ } else {
+ return startServerId + "-" + endServerId;
+ }
+ }
+
+ /**
+ * Returns a {@link ServerIdRange} from a server id range string which likes '5400-5408' or a
+ * single server id likes '5400'.
+ */
+ public static @Nullable ServerIdRange from(@Nullable String range) {
+ if (range == null) {
+ return null;
+ }
+ if (range.contains("-")) {
+ String[] idArray = range.split("-");
+ if (idArray.length != 2) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The server id range should be syntax like '5400-5500', but got: %s",
+ range));
+ }
+ return new ServerIdRange(
+ parseServerId(idArray[0].trim()), parseServerId(idArray[1].trim()));
+ } else {
+ int serverId = parseServerId(range);
+ return new ServerIdRange(serverId, serverId);
+ }
+ }
+
+ private static int parseServerId(String serverIdValue) {
+ try {
+ return Integer.parseInt(serverIdValue);
+ } catch (NumberFormatException e) {
+ throw new IllegalStateException(
+ String.format("The server id %s is not a valid numeric.", serverIdValue), e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPoolId.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPoolId.java
new file mode 100644
index 000000000..8fcef7de9
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPoolId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The connection pool identifier. */
+public class ConnectionPoolId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final String host;
+ private final int port;
+
+ public ConnectionPoolId(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ConnectionPoolId)) {
+ return false;
+ }
+ ConnectionPoolId that = (ConnectionPoolId) o;
+ return Objects.equals(host, that.host) && Objects.equals(port, that.port);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, port);
+ }
+
+ @Override
+ public String toString() {
+ return host + ':' + port;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPools.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPools.java
new file mode 100644
index 000000000..ab3a6b36d
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/ConnectionPools.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.flink.annotation.Internal;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+
+/** A JDBC connection pools that consists of {@link HikariDataSource}. */
+@Internal
+public interface ConnectionPools {
+
+ /**
+ * Gets a connection pool from pools, create a new pool if the pool does not exists in the
+ * connection pools .
+ */
+ HikariDataSource getOrCreateConnectionPool(
+ ConnectionPoolId poolId, MySqlSourceConfig sourceConfig);
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionFactory.java
new file mode 100644
index 000000000..6454cb50e
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import java.sql.Connection;
+import java.sql.SQLException;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A factory to create JDBC connection for MySQL. */
+public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionFactory.class);
+
+ private final MySqlSourceConfig sourceConfig;
+
+ public JdbcConnectionFactory(MySqlSourceConfig sourceConfig) {
+ this.sourceConfig = sourceConfig;
+ }
+
+ @Override
+ public Connection connect(JdbcConfiguration config) throws SQLException {
+ final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
+
+ final ConnectionPoolId connectionPoolId =
+ new ConnectionPoolId(sourceConfig.getHostname(), sourceConfig.getPort());
+
+ HikariDataSource dataSource =
+ JdbcConnectionPools.getInstance()
+ .getOrCreateConnectionPool(connectionPoolId, sourceConfig);
+
+ int i = 0;
+ while (i < connectRetryTimes) {
+ try {
+ return dataSource.getConnection();
+ } catch (SQLException e) {
+ if (i < connectRetryTimes - 1) {
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException ie) {
+ throw new FlinkRuntimeException(
+ "Failed to get connection, interrupted while doing another attempt",
+ ie);
+ }
+ LOG.warn("Get connection failed, retry times {}", i + 1);
+ } else {
+ LOG.error("Get connection failed after retry {} times", i + 1);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+ i++;
+ }
+ return dataSource.getConnection();
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionPools.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionPools.java
new file mode 100644
index 000000000..72801b509
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/JdbcConnectionPools.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection.PooledDataSourceFactory.createPooledDataSource;
+
+import com.zaxxer.hikari.HikariDataSource;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A Jdbc Connection pools implementation. */
+public class JdbcConnectionPools implements ConnectionPools {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class);
+
+ private static final JdbcConnectionPools INSTANCE = new JdbcConnectionPools();
+ private final Map<ConnectionPoolId, HikariDataSource> pools = new HashMap<>();
+
+ private JdbcConnectionPools() {
+ }
+
+ public static JdbcConnectionPools getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public HikariDataSource getOrCreateConnectionPool(
+ ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) {
+ synchronized (pools) {
+ if (!pools.containsKey(poolId)) {
+ LOG.info("Create and register connection pool {}", poolId);
+ pools.put(poolId, createPooledDataSource(sourceConfig));
+ }
+ return pools.get(poolId);
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/PooledDataSourceFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/PooledDataSourceFactory.java
new file mode 100644
index 000000000..eafe67dbf
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/connection/PooledDataSourceFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.connection;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import java.util.Properties;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceConfig;
+
+/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
+public class PooledDataSourceFactory {
+
+ public static final String JDBC_URL_PATTERN =
+ "jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true";
+ public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
+ public static final String SERVER_TIMEZONE_KEY = "serverTimezone";
+ public static final int MINIMUM_POOL_SIZE = 1;
+ private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
+
+ private PooledDataSourceFactory() {
+ }
+
+ public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceConfig) {
+ final HikariConfig config = new HikariConfig();
+
+ String hostName = sourceConfig.getHostname();
+ int port = sourceConfig.getPort();
+ Properties jdbcProperties = sourceConfig.getJdbcProperties();
+
+ config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
+ config.setJdbcUrl(formatJdbcUrl(hostName, port, jdbcProperties));
+ config.setUsername(sourceConfig.getUsername());
+ config.setPassword(sourceConfig.getPassword());
+ config.setMinimumIdle(MINIMUM_POOL_SIZE);
+ config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
+ config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
+ config.addDataSourceProperty(SERVER_TIMEZONE_KEY, sourceConfig.getServerTimeZone());
+ config.setDriverClassName(
+ sourceConfig.getDbzConfiguration().getString(MySqlConnectorConfig.JDBC_DRIVER));
+
+ // optional optimization configurations for pooled DataSource
+ config.addDataSourceProperty("cachePrepStmts", "true");
+ config.addDataSourceProperty("prepStmtCacheSize", "250");
+ config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
+
+ return new HikariDataSource(config);
+ }
+
+ private static String formatJdbcUrl(String hostName, int port, Properties jdbcProperties) {
+ Properties combinedProperties = new Properties();
+ combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
+ combinedProperties.putAll(jdbcProperties);
+
+ StringBuilder jdbcUrlStringBuilder =
+ new StringBuilder(String.format(JDBC_URL_PATTERN, hostName, port));
+
+ combinedProperties.forEach(
+ (key, value) -> {
+ jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
+ });
+
+ return jdbcUrlStringBuilder.toString();
+ }
+
+ private static Properties initializeDefaultJdbcProperties() {
+ Properties defaultJdbcProperties = new Properties();
+ defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
+ defaultJdbcProperties.setProperty("characterEncoding", "UTF-8");
+ defaultJdbcProperties.setProperty("characterSetResults", "UTF-8");
+ return defaultJdbcProperties;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/JdbcUrlUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/JdbcUrlUtils.java
new file mode 100644
index 000000000..fdf83b600
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/JdbcUrlUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import java.util.Map;
+import java.util.Properties;
+
+/** Option utils for JDBC URL properties. */
+public class JdbcUrlUtils {
+
+ // Prefix for JDBC specific properties.
+ public static final String PROPERTIES_PREFIX = "jdbc.properties.";
+
+ public static Properties getJdbcProperties(Map<String, String> tableOptions) {
+ Properties jdbcProperties = new Properties();
+ if (hasJdbcProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey = key.substring((PROPERTIES_PREFIX).length());
+ jdbcProperties.put(subKey, value);
+ });
+ }
+ return jdbcProperties;
+ }
+
+ /**
+ * Decides if the table options contains JDBC properties that start with prefix
+ * 'jdbc.properties'.
+ */
+ private static boolean hasJdbcProperties(Map<String, String> tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlDeserializationConverterFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlDeserializationConverterFactory.java
new file mode 100644
index 000000000..d4b4c7858
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlDeserializationConverterFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import com.esri.core.geometry.ogc.OGCGeometry;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DeserializationRuntimeConverter;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DeserializationRuntimeConverterFactory;
+import io.debezium.data.EnumSet;
+import io.debezium.data.geometry.Geometry;
+import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Used to create {@link DeserializationRuntimeConverterFactory} specified to MySQL. */
+public class MySqlDeserializationConverterFactory {
+
+ public static DeserializationRuntimeConverterFactory instance() {
+ return new DeserializationRuntimeConverterFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
+ LogicalType logicalType, ZoneId serverTimeZone) {
+ switch (logicalType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return createStringConverter();
+ case ARRAY:
+ return createArrayConverter((ArrayType) logicalType);
+ default:
+ // fallback to default converter
+ return Optional.empty();
+ }
+ }
+ };
+ }
+
+ private static Optional<DeserializationRuntimeConverter> createStringConverter() {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final ObjectWriter objectWriter = objectMapper.writer();
+ return Optional.of(
+ new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ // the Geometry datatype in MySQL will be converted to
+ // a String with Json format
+ if (Point.LOGICAL_NAME.equals(schema.name())
+ || Geometry.LOGICAL_NAME.equals(schema.name())) {
+ try {
+ Struct geometryStruct = (Struct) dbzObj;
+ byte[] wkb = geometryStruct.getBytes("wkb");
+ String geoJson =
+ OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
+ JsonNode originGeoNode = objectMapper.readTree(geoJson);
+ Optional<Integer> srid =
+ Optional.ofNullable(geometryStruct.getInt32("srid"));
+ Map<String, Object> geometryInfo = new HashMap<>();
+ String geometryType = originGeoNode.get("type").asText();
+ geometryInfo.put("type", geometryType);
+ if (geometryType.equals("GeometryCollection")) {
+ geometryInfo.put("geometries", originGeoNode.get("geometries"));
+ } else {
+ geometryInfo.put(
+ "coordinates", originGeoNode.get("coordinates"));
+ }
+ geometryInfo.put("srid", srid.orElse(0));
+ return StringData.fromString(
+ objectWriter.writeValueAsString(geometryInfo));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to convert %s to geometry JSON.", dbzObj),
+ e);
+ }
+ } else {
+ return StringData.fromString(dbzObj.toString());
+ }
+ }
+ });
+ }
+
+ private static Optional<DeserializationRuntimeConverter> createArrayConverter(
+ ArrayType arrayType) {
+ if (LogicalTypeChecks.hasFamily(
+ arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) {
+ // only map MySQL SET type to Flink ARRAY<STRING> type
+ return Optional.of(
+ new DeserializationRuntimeConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ if (EnumSet.LOGICAL_NAME.equals(schema.name())
+ && dbzObj instanceof String) {
+ // for SET datatype in mysql, debezium will always
+ // return a string split by comma like "a,b,c"
+ String[] enums = ((String) dbzObj).split(",");
+ StringData[] elements = new StringData[enums.length];
+ for (int i = 0; i < enums.length; i++) {
+ elements[i] = StringData.fromString(enums[i]);
+ }
+ return new GenericArrayData(elements);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unable convert to Flink ARRAY type from unexpected value '%s', "
+ + "only SET type could be converted to ARRAY type for MySQL",
+ dbzObj));
+ }
+ }
+ });
+ } else {
+ // otherwise, fallback to default converter
+ return Optional.empty();
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
new file mode 100644
index 000000000..fc456f929
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** Defines the supported metadata columns for {@link MySqlTableSource}. */
+public enum MySqlReadableMetadata {
+ /** Name of the table that contain the row. */
+ TABLE_NAME(
+ "table_name",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ return StringData.fromString(
+ sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+ }
+ }),
+
+ /** Name of the database that contain the row. */
+ DATABASE_NAME(
+ "database_name",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ return StringData.fromString(
+ sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+ }
+ }),
+
+ /**
+ * It indicates the time that the change was made in the database. If the record is read from
+ * snapshot of the table instead of the binlog, the value is always 0.
+ */
+ OP_TS(
+ "op_ts",
+ DataTypes.TIMESTAMP_LTZ(3).notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ return TimestampData.fromEpochMillis(
+ (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+ }
+ }),
+
+ /** Name of the table that contain the row. . */
+ META_TABLE_NAME(
+ "meta.table_name",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ return StringData.fromString(
+ sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+ }
+ }),
+
+ /** Name of the database that contain the row. */
+ META_DATABASE_NAME(
+ "meta.database_name",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ return StringData.fromString(
+ sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+ }
+ }),
+
+ /**
+ * It indicates the time that the change was made in the database. If the record is read from
+ * snapshot of the table instead of the binlog, the value is always 0.
+ */
+ META_OP_TS(
+ "meta.op_ts",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ return TimestampData.fromEpochMillis(
+ (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+ }
+ }),
+
+ /** Operation type, INSERT/UPDATE/DELETE. */
+ OP_TYPE(
+ "meta.op_type",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ final Envelope.Operation op = Envelope.operationFor(record);
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ return StringData.fromString("INSERT");
+ } else if (op == Envelope.Operation.DELETE) {
+ return StringData.fromString("DELETE");
+ } else {
+ return StringData.fromString("UPDATE");
+ }
+ }
+ }),
+
+ /** Not important, a simple increment counter. */
+ BATCH_ID(
+ "meta.batch_id",
+ DataTypes.BIGINT().nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ private long id = 0;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return id++;
+ }
+ }),
+
+ /** Source does not emit ddl data. */
+ IS_DDL(
+ "meta.is_ddl",
+ DataTypes.BOOLEAN().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return false;
+ }
+ }),
+
+ /** The update-before data for UPDATE record. */
+ OLD(
+ "meta.update_before",
+ DataTypes.ARRAY(
+ DataTypes.MAP(
+ DataTypes.STRING().nullable(),
+ DataTypes.STRING().nullable())
+ .nullable())
+ .nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ final Envelope.Operation op = Envelope.operationFor(record);
+ if (op != Envelope.Operation.UPDATE) {
+ return null;
+ }
+ return record;
+ }
+ }),
+
+ MYSQL_TYPE(
+ "meta.mysql_type",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return null;
+ }
+
+ @Override
+ public Object read(
+ SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ Map<StringData, StringData> mysqlType = new HashMap<>();
+ final Table table = tableSchema.getTable();
+ table.columns()
+ .forEach(
+ column -> {
+ mysqlType.put(
+ StringData.fromString(column.name()),
+ StringData.fromString(
+ String.format(
+ "%s(%d)",
+ column.typeName(),
+ column.length())));
+ });
+
+ return new GenericMapData(mysqlType);
+ }
+ }),
+
+ PK_NAMES(
+ "meta.pk_names",
+ DataTypes.ARRAY(DataTypes.STRING().nullable()).nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return null;
+ }
+
+ @Override
+ public Object read(
+ SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ return new GenericArrayData(
+ tableSchema.getTable().primaryKeyColumnNames().stream()
+ .map(StringData::fromString)
+ .toArray());
+ }
+ }),
+
+ SQL(
+ "meta.sql",
+ DataTypes.STRING().nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return StringData.fromString("");
+ }
+ }),
+
+ SQL_TYPE(
+ "meta.sql_type",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return null;
+ }
+
+ @Override
+ public Object read(
+ SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+ if (tableSchema == null) {
+ return null;
+ }
+ Map<StringData, Integer> mysqlType = new HashMap<>();
+ final Table table = tableSchema.getTable();
+ table.columns()
+ .forEach(
+ column -> {
+ mysqlType.put(
+ StringData.fromString(column.name()),
+ column.jdbcType());
+ });
+
+ return new GenericMapData(mysqlType);
+ }
+ }),
+
+ TS(
+ "meta.ts",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ Struct messageStruct = (Struct) record.value();
+ return TimestampData.fromEpochMillis(
+ (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP));
+ }
+ });
+
+
+
+
+ private final String key;
+
+ private final DataType dataType;
+
+ private final MetadataConverter converter;
+
+ MySqlReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public MetadataConverter getConverter() {
+ return converter;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
new file mode 100644
index 000000000..d4611c1e5
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical
+ * description.
+ */
+public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final ResolvedSchema physicalSchema;
+ private final int port;
+ private final String hostname;
+ private final String database;
+ private final String username;
+ private final String password;
+ private final String serverId;
+ private final String tableName;
+ private final ZoneId serverTimeZone;
+ private final Properties dbzProperties;
+ private final boolean enableParallelRead;
+ private final int splitSize;
+ private final int splitMetaGroupSize;
+ private final int fetchSize;
+ private final Duration connectTimeout;
+ private final int connectionPoolSize;
+ private final int connectMaxRetries;
+ private final double distributionFactorUpper;
+ private final double distributionFactorLower;
+ private final StartupOptions startupOptions;
+ private final boolean scanNewlyAddedTableEnabled;
+ private final Properties jdbcProperties;
+ private final Duration heartbeatInterval;
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ public MySqlTableSource(
+ ResolvedSchema physicalSchema,
+ int port,
+ String hostname,
+ String database,
+ String tableName,
+ String username,
+ String password,
+ ZoneId serverTimeZone,
+ Properties dbzProperties,
+ @Nullable String serverId,
+ boolean enableParallelRead,
+ int splitSize,
+ int splitMetaGroupSize,
+ int fetchSize,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ StartupOptions startupOptions,
+ Duration heartbeatInterval) {
+ this(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ tableName,
+ username,
+ password,
+ serverTimeZone,
+ dbzProperties,
+ serverId,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ startupOptions,
+ false,
+ new Properties(),
+ heartbeatInterval);
+ }
+
+ public MySqlTableSource(
+ ResolvedSchema physicalSchema,
+ int port,
+ String hostname,
+ String database,
+ String tableName,
+ String username,
+ String password,
+ ZoneId serverTimeZone,
+ Properties dbzProperties,
+ @Nullable String serverId,
+ boolean enableParallelRead,
+ int splitSize,
+ int splitMetaGroupSize,
+ int fetchSize,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ StartupOptions startupOptions,
+ boolean scanNewlyAddedTableEnabled,
+ Properties jdbcProperties,
+ Duration heartbeatInterval) {
+ this.physicalSchema = physicalSchema;
+ this.port = port;
+ this.hostname = checkNotNull(hostname);
+ this.database = checkNotNull(database);
+ this.tableName = checkNotNull(tableName);
+ this.username = checkNotNull(username);
+ this.password = checkNotNull(password);
+ this.serverId = serverId;
+ this.serverTimeZone = serverTimeZone;
+ this.dbzProperties = dbzProperties;
+ this.enableParallelRead = enableParallelRead;
+ this.splitSize = splitSize;
+ this.splitMetaGroupSize = splitMetaGroupSize;
+ this.fetchSize = fetchSize;
+ this.connectTimeout = connectTimeout;
+ this.connectMaxRetries = connectMaxRetries;
+ this.connectionPoolSize = connectionPoolSize;
+ this.distributionFactorUpper = distributionFactorUpper;
+ this.distributionFactorLower = distributionFactorLower;
+ this.startupOptions = startupOptions;
+ this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
+ this.jdbcProperties = jdbcProperties;
+ // Mutable attributes
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ RowType physicalDataType =
+ (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+ MetadataConverter[] metadataConverters = getMetadataConverters();
+ final TypeInformation<RowData> typeInfo =
+ scanContext.createTypeInformation(producedDataType);
+
+ DebeziumDeserializationSchema<RowData> deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType(physicalDataType)
+ .setMetadataConverters(metadataConverters)
+ .setResultTypeInfo(typeInfo)
+ .setServerTimeZone(serverTimeZone)
+ .setUserDefinedConverterFactory(
+ MySqlDeserializationConverterFactory.instance())
+ .build();
+
+ org.apache.inlong.sort.singletenant.flink.cdc.mysql.MySqlSource.Builder builder =
+ org.apache.inlong.sort.singletenant.flink.cdc.mysql.MySqlSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .databaseList(database)
+ .tableList(database + "." + tableName)
+ .username(username)
+ .password(password)
+ .serverTimeZone(serverTimeZone.toString())
+ .debeziumProperties(dbzProperties)
+ .startupOptions(startupOptions)
+ .deserializer(deserializer);
+ Optional.ofNullable(serverId)
+ .ifPresent(serverId -> builder.serverId(Integer.parseInt(serverId)));
+ DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+ return SourceFunctionProvider.of(sourceFunction, false);
+
+ }
+
+ protected MetadataConverter[] getMetadataConverters() {
+ if (metadataKeys.isEmpty()) {
+ return new MetadataConverter[0];
+ }
+
+ return metadataKeys.stream()
+ .map(
+ key ->
+ Stream.of(MySqlReadableMetadata.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(MySqlReadableMetadata::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Stream.of(MySqlReadableMetadata.values())
+ .collect(
+ Collectors.toMap(
+ MySqlReadableMetadata::getKey, MySqlReadableMetadata::getDataType));
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ MySqlTableSource source =
+ new MySqlTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ tableName,
+ username,
+ password,
+ serverTimeZone,
+ dbzProperties,
+ serverId,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ startupOptions,
+ scanNewlyAddedTableEnabled,
+ jdbcProperties,
+ heartbeatInterval);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof MySqlTableSource)) {
+ return false;
+ }
+ MySqlTableSource that = (MySqlTableSource) o;
+ return port == that.port
+ && enableParallelRead == that.enableParallelRead
+ && splitSize == that.splitSize
+ && splitMetaGroupSize == that.splitMetaGroupSize
+ && fetchSize == that.fetchSize
+ && distributionFactorUpper == that.distributionFactorUpper
+ && distributionFactorLower == that.distributionFactorLower
+ && scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled
+ && Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(hostname, that.hostname)
+ && Objects.equals(database, that.database)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password)
+ && Objects.equals(serverId, that.serverId)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(serverTimeZone, that.serverTimeZone)
+ && Objects.equals(dbzProperties, that.dbzProperties)
+ && Objects.equals(connectTimeout, that.connectTimeout)
+ && Objects.equals(connectMaxRetries, that.connectMaxRetries)
+ && Objects.equals(connectionPoolSize, that.connectionPoolSize)
+ && Objects.equals(startupOptions, that.startupOptions)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(jdbcProperties, that.jdbcProperties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ username,
+ password,
+ serverId,
+ tableName,
+ serverTimeZone,
+ dbzProperties,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ startupOptions,
+ producedDataType,
+ metadataKeys,
+ scanNewlyAddedTableEnabled,
+ jdbcProperties);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "MySQL-CDC";
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSourceFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSourceFactory.java
new file mode 100644
index 000000000..4ac6f90ab
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSourceFactory.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.ServerIdRange;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DebeziumOptions;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_NAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.ObjectUtils.doubleCompare;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.CONNECT_TIMEOUT;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.HOSTNAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.PASSWORD;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.PORT;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_MODE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SERVER_ID;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SERVER_TIME_ZONE;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.TABLE_NAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.config.MySqlSourceOptions.USERNAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Factory for creating configured instance of {@link MySqlTableSource}. */
+public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
+
+ private static final String IDENTIFIER = "mysql-cdc";
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(
+ DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
+
+ final ReadableConfig config = helper.getOptions();
+ final String hostname = config.get(HOSTNAME);
+ final String username = config.get(USERNAME);
+ final String password = config.get(PASSWORD);
+ final String databaseName = config.get(DATABASE_NAME);
+ validateRegex(DATABASE_NAME.key(), databaseName);
+ final String tableName = config.get(TABLE_NAME);
+ validateRegex(TABLE_NAME.key(), tableName);
+ int port = config.get(PORT);
+ int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+ int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+ ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+
+ ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
+ String serverId = validateAndGetServerId(config);
+ StartupOptions startupOptions = getStartupOptions(config);
+ Duration connectTimeout = config.get(CONNECT_TIMEOUT);
+ int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+ int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+ double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+ Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
+
+ boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ if (enableParallelRead) {
+ validatePrimaryKeyIfEnableParallel(physicalSchema);
+ validateStartupOptionIfEnableParallel(startupOptions);
+ validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
+ validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
+ validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
+ validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
+ validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
+ validateDistributionFactorUpper(distributionFactorUpper);
+ validateDistributionFactorLower(distributionFactorLower);
+ }
+
+ return new MySqlTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ databaseName,
+ tableName,
+ username,
+ password,
+ serverTimeZone,
+ getDebeziumProperties(context.getCatalogTable().getOptions()),
+ serverId,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ startupOptions,
+ scanNewlyAddedTableEnabled,
+ JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
+ heartbeatInterval);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOSTNAME);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(DATABASE_NAME);
+ options.add(TABLE_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PORT);
+ options.add(SERVER_TIME_ZONE);
+ options.add(SERVER_ID);
+ options.add(SCAN_STARTUP_MODE);
+ options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+ options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+ options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ options.add(CHUNK_META_GROUP_SIZE);
+ options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+ options.add(CONNECT_TIMEOUT);
+ options.add(CONNECTION_POOL_SIZE);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ options.add(CONNECT_MAX_RETRIES);
+ options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+ options.add(HEARTBEAT_INTERVAL);
+ return options;
+ }
+
+ private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+ private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
+
+ private static StartupOptions getStartupOptions(ReadableConfig config) {
+ String modeString = config.get(SCAN_STARTUP_MODE);
+
+ switch (modeString.toLowerCase()) {
+ case SCAN_STARTUP_MODE_VALUE_INITIAL:
+ return StartupOptions.initial();
+
+ case SCAN_STARTUP_MODE_VALUE_LATEST:
+ return StartupOptions.latest();
+
+ case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+ case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
+ case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+ throw new ValidationException(
+ String.format(
+ "Unsupported option value '%s', the options [%s, %s, %s] "
+ + "are not supported correctly, "
+ + "please do not use them until they're correctly supported",
+ modeString,
+ SCAN_STARTUP_MODE_VALUE_EARLIEST,
+ SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
+ SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+ default:
+ throw new ValidationException(
+ String.format(
+ "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
+ SCAN_STARTUP_MODE.key(),
+ SCAN_STARTUP_MODE_VALUE_INITIAL,
+ SCAN_STARTUP_MODE_VALUE_LATEST,
+ modeString));
+ }
+ }
+
+ private void validatePrimaryKeyIfEnableParallel(ResolvedSchema physicalSchema) {
+ if (!physicalSchema.getPrimaryKey().isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "The primary key is necessary when enable '%s' to 'true'",
+ SCAN_INCREMENTAL_SNAPSHOT_ENABLED));
+ }
+ }
+
+ private void validateStartupOptionIfEnableParallel(StartupOptions startupOptions) {
+ // validate mode
+ Preconditions.checkState(
+ startupOptions.startupMode == StartupMode.INITIAL
+ || startupOptions.startupMode == StartupMode.LATEST_OFFSET,
+ String.format(
+ "MySql Parallel Source only supports startup mode 'initial' and 'latest-offset',"
+ + " but actual is %s",
+ startupOptions.startupMode));
+ }
+
+ private String validateAndGetServerId(ReadableConfig configuration) {
+ final String serverIdValue = configuration.get(SERVER_ID);
+ if (serverIdValue != null) {
+ // validation
+ try {
+ ServerIdRange.from(serverIdValue);
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "The value of option 'server-id' is invalid: '%s'", serverIdValue),
+ e);
+ }
+ }
+ return serverIdValue;
+ }
+
+ /** Checks the value of given integer option is valid. */
+ private void validateIntegerOption(
+ ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
+ checkState(
+ optionValue > exclusiveMin,
+ String.format(
+ "The value of option '%s' must larger than %d, but is %d",
+ option.key(), exclusiveMin, optionValue));
+ }
+
+ /**
+ * Checks the given regular expression's syntax is valid.
+ *
+ * @param optionName the option name of the regex
+ * @param regex The regular expression to be checked
+ * @throws ValidationException If the expression's syntax is invalid
+ */
+ private void validateRegex(String optionName, String regex) {
+ try {
+ Pattern.compile(regex);
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "The %s '%s' is not a valid regular expression", optionName, regex),
+ e);
+ }
+ }
+
+ /** Checks the value of given evenly distribution factor upper bound is valid. */
+ private void validateDistributionFactorUpper(double distributionFactorUpper) {
+ checkState(
+ doubleCompare(distributionFactorUpper, 1.0d) >= 0,
+ String.format(
+ "The value of option '%s' must larger than or equals %s, but is %s",
+ SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
+ 1.0d,
+ distributionFactorUpper));
+ }
+
+ /** Checks the value of given evenly distribution factor lower bound is valid. */
+ private void validateDistributionFactorLower(double distributionFactorLower) {
+ checkState(
+ doubleCompare(distributionFactorLower, 0.0d) >= 0
+ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
+ String.format(
+ "The value of option '%s' must between %s and %s inclusively, but is %s",
+ SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
+ 0.0d,
+ 1.0d,
+ distributionFactorLower));
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupMode.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupMode.java
new file mode 100644
index 000000000..3ffc2bef3
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupMode.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+/**
+ * Startup modes for the MySQL CDC Consumer.
+ *
+ * @see StartupOptions
+ */
+public enum StartupMode {
+ INITIAL,
+
+ EARLIEST_OFFSET,
+
+ LATEST_OFFSET,
+
+ SPECIFIC_OFFSETS,
+
+ TIMESTAMP
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupOptions.java
new file mode 100644
index 000000000..67edcf744
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/StartupOptions.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Debezium startup options. */
+public final class StartupOptions implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public final StartupMode startupMode;
+ public final String specificOffsetFile;
+ public final Integer specificOffsetPos;
+ public final Long startupTimestampMillis;
+
+ /**
+ * Performs an initial snapshot on the monitored database tables upon first startup, and
+ * continue to read the latest binlog.
+ */
+ public static StartupOptions initial() {
+ return new StartupOptions(StartupMode.INITIAL, null, null, null);
+ }
+
+ /**
+ * Never to perform snapshot on the monitored database tables upon first startup, just read from
+ * the beginning of the binlog. This should be used with care, as it is only valid when the
+ * binlog is guaranteed to contain the entire history of the database.
+ */
+ public static StartupOptions earliest() {
+ return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, null);
+ }
+
+ /**
+ * Never to perform snapshot on the monitored database tables upon first startup, just read from
+ * the end of the binlog which means only have the changes since the connector was started.
+ */
+ public static StartupOptions latest() {
+ return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
+ }
+
+ /**
+ * Never to perform snapshot on the monitored database tables upon first startup, and directly
+ * read binlog from the specified offset.
+ */
+ public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) {
+ return new StartupOptions(
+ StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, specificOffsetPos, null);
+ }
+
+ /**
+ * Never to perform snapshot on the monitored database tables upon first startup, and directly
+ * read binlog from the specified timestamp.
+ *
+ * <p>The consumer will traverse the binlog from the beginning and ignore change events whose
+ * timestamp is smaller than the specified timestamp.
+ *
+ * @param startupTimestampMillis timestamp for the startup offsets, as milliseconds from epoch.
+ */
+ public static StartupOptions timestamp(long startupTimestampMillis) {
+ return new StartupOptions(StartupMode.TIMESTAMP, null, null, startupTimestampMillis);
+ }
+
+ private StartupOptions(
+ StartupMode startupMode,
+ String specificOffsetFile,
+ Integer specificOffsetPos,
+ Long startupTimestampMillis) {
+ this.startupMode = startupMode;
+ this.specificOffsetFile = specificOffsetFile;
+ this.specificOffsetPos = specificOffsetPos;
+ this.startupTimestampMillis = startupTimestampMillis;
+
+ switch (startupMode) {
+ case INITIAL:
+ case EARLIEST_OFFSET:
+ case LATEST_OFFSET:
+ break;
+ case SPECIFIC_OFFSETS:
+ checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't be null");
+ checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't be null");
+ break;
+ case TIMESTAMP:
+ checkNotNull(startupTimestampMillis, "startupTimestampMillis shouldn't be null");
+ break;
+ default:
+ throw new UnsupportedOperationException(startupMode + " mode is not supported.");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StartupOptions that = (StartupOptions) o;
+ return startupMode == that.startupMode
+ && Objects.equals(specificOffsetFile, that.specificOffsetFile)
+ && Objects.equals(specificOffsetPos, that.specificOffsetPos)
+ && Objects.equals(startupTimestampMillis, that.startupTimestampMillis);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ startupMode, specificOffsetFile, specificOffsetPos, startupTimestampMillis);
+ }
+}