You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/04/26 02:30:53 UTC
[incubator-inlong] branch master updated: [INLONG-3924][Sort] Add mysql cdc, support multiple meta data (#3924)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b09093a88 [INLONG-3924][Sort] Add mysql cdc, support multiple meta data (#3924)
b09093a88 is described below
commit b09093a88e6c7c359642a364fa30ed2db6a7a2e4
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Apr 26 10:30:47 2022 +0800
[INLONG-3924][Sort] Add mysql cdc, support multiple meta data (#3924)
---
inlong-sort/sort-single-tenant/pom.xml | 30 +
.../debezium/DebeziumDeserializationSchema.java | 42 ++
.../flink/cdc/debezium/DebeziumSourceFunction.java | 576 ++++++++++++++++++
.../JsonDebeziumDeserializationSchema.java | 98 ++++
.../StringDebeziumDeserializationSchema.java | 49 ++
.../singletenant/flink/cdc/debezium/Validator.java | 33 ++
.../history/FlinkJsonTableChangeSerializer.java | 207 +++++++
.../debezium/internal/DebeziumChangeConsumer.java | 103 ++++
.../debezium/internal/DebeziumChangeFetcher.java | 309 ++++++++++
.../cdc/debezium/internal/DebeziumOffset.java | 64 ++
.../internal/DebeziumOffsetSerializer.java | 41 ++
.../debezium/internal/FlinkDatabaseHistory.java | 116 ++++
.../internal/FlinkDatabaseSchemaHistory.java | 199 +++++++
.../debezium/internal/FlinkOffsetBackingStore.java | 201 +++++++
.../flink/cdc/debezium/internal/Handover.java | 194 ++++++
.../flink/cdc/debezium/internal/SchemaRecord.java | 95 +++
.../debezium/table/AppendMetadataCollector.java | 63 ++
.../flink/cdc/debezium/table/DebeziumOptions.java | 53 ++
.../table/DeserializationRuntimeConverter.java | 31 +
.../DeserializationRuntimeConverterFactory.java | 47 ++
.../cdc/debezium/table/MetadataConverter.java | 36 ++
.../table/RowDataDebeziumDeserializeSchema.java | 651 +++++++++++++++++++++
.../cdc/debezium/utils/DatabaseHistoryUtil.java | 86 +++
.../cdc/debezium/utils/TemporalConversions.java | 216 +++++++
pom.xml | 32 +-
25 files changed, 3571 insertions(+), 1 deletion(-)
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index f1b831651..29eb8ead2 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -31,6 +31,11 @@
<artifactId>sort-single-tenant</artifactId>
<name>Apache InLong - Sort Single Tenant</name>
+ <properties>
+ <flink-connector-debezium.version>2.0.1</flink-connector-debezium.version>
+ <debezium-connector-mysql.version>1.5.4.Final</debezium-connector-mysql.version>
+ <debezium-core.version>1.5.4.Final</debezium-core.version>
+ </properties>
<packaging>jar</packaging>
<dependencies>
@@ -181,6 +186,31 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-debezium</artifactId>
+ <version>${flink-connector-debezium.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>kafka-log4j-appender</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mysql</artifactId>
+ <version>${debezium-connector-mysql.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ <version>${debezium-core.version}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
new file mode 100644
index 000000000..a23fe10e3
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumDeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.Serializable;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * The deserialization schema describes how to turn the Debezium SourceRecord into data types
+ * (Java/Scala objects) that are processed by Flink.
+ *
+ * @param <T> The type created by the deserialization schema.
+ */
+@PublicEvolving
+public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+ /** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */
+ void deserialize(SourceRecord record, Collector<T> out) throws Exception;
+
+ void deserialize(SourceRecord record, Collector<T> out, TableChange tableChange) throws Exception;
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
new file mode 100644
index 000000000..321b9ac14
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/DebeziumSourceFunction.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeConsumer;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumChangeFetcher;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffset;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.DebeziumOffsetSerializer;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseHistory;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.FlinkOffsetBackingStore;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.Handover;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>There are two workers during the runtime. One worker periodically pulls records from the
+ * database and pushes the records into the {@link Handover}. The other worker consumes the records
+ * from the {@link Handover} and convert the records to the data in Flink style. The reason why
+ * don't use one workers is because debezium has different behaviours in snapshot phase and
+ * streaming phase.
+ *
+ * <p>Here we use the {@link Handover} as the buffer to submit data from the producer to the
+ * consumer. Because the two threads don't communicate to each other directly, the error reporting
+ * also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link
+ * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the
+ * consumer to check the error. However, the source function just closes the engine and wakes up the
+ * producer if the error is from the Flink side.
+ *
+ * <p>If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the
+ * logic in the error reporting.
+ *
+ * <p>The source function participates in checkpointing and guarantees that no data is lost during a
+ * failure, and that the computation processes elements "exactly once".
+ *
+ * <p>Note: currently, the source function can't run in multiple parallel instances.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+ implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = -5808108641062931623L;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+ /** State name of the consumer's partition offset states. */
+ public static final String OFFSETS_STATE_NAME = "offset-states";
+
+ /** State name of the consumer's history records state. */
+ public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+ /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
+ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+ /**
+ * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+ * not.
+ */
+ public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+ /** The configuration value represents legacy implementation. */
+ public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+ // ---------------------------------------------------------------------------------------
+ // Properties
+ // ---------------------------------------------------------------------------------------
+
+ /** The schema to convert from Debezium's messages into Flink's objects. */
+ private final DebeziumDeserializationSchema<T> deserializer;
+
+ /** User-supplied properties for Kafka. * */
+ private final Properties properties;
+
+ /** The specific binlog offset to read from when the first startup. */
+ private final @Nullable DebeziumOffset specificOffset;
+
+ /** Data for pending but uncommitted offsets. */
+ private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+ /** Flag indicating whether the Debezium Engine is started. */
+ private volatile boolean debeziumStarted = false;
+
+ /** Validator to validate the connected database satisfies the cdc connector's requirements. */
+ private final Validator validator;
+
+ // ---------------------------------------------------------------------------------------
+ // State
+ // ---------------------------------------------------------------------------------------
+
+ /**
+ * The offsets to restore to, if the consumer restores state from a checkpoint.
+ *
+ * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+ * method.
+ *
+ * <p>Using a String because we are encoding the offset state in JSON bytes.
+ */
+ private transient volatile String restoredOffsetState;
+
+ /** Accessor for state in the operator state backend. */
+ private transient ListState<byte[]> offsetState;
+
+ /**
+ * State to store the history records, i.e. schema changes.
+ *
+ * @see FlinkDatabaseHistory
+ * @see FlinkDatabaseSchemaHistory
+ */
+ private transient ListState<String> schemaRecordsState;
+
+ // ---------------------------------------------------------------------------------------
+ // Worker
+ // ---------------------------------------------------------------------------------------
+
+ private transient ExecutorService executor;
+ private transient DebeziumEngine<?> engine;
+ /**
+ * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+ * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+ */
+ private transient String engineInstanceName;
+
+ /** Consume the events from the engine and commit the offset to the engine. */
+ private transient DebeziumChangeConsumer changeConsumer;
+
+ /** The consumer to fetch records from {@link Handover}. */
+ private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+ /** Buffer the events from the source and record the errors from the debezium. */
+ private transient Handover handover;
+
+ // ---------------------------------------------------------------------------------------
+
+ public DebeziumSourceFunction(
+ DebeziumDeserializationSchema<T> deserializer,
+ Properties properties,
+ @Nullable DebeziumOffset specificOffset,
+ Validator validator) {
+ this.deserializer = deserializer;
+ this.properties = properties;
+ this.specificOffset = specificOffset;
+ this.validator = validator;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ validator.validate();
+ super.open(parameters);
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+ this.executor = Executors.newSingleThreadExecutor(threadFactory);
+ this.handover = new Handover();
+ this.changeConsumer = new DebeziumChangeConsumer(handover);
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpoint and restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ this.offsetState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ OFFSETS_STATE_NAME,
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+ this.schemaRecordsState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+ if (context.isRestored()) {
+ restoreOffsetState();
+ restoreHistoryRecordsState();
+ } else {
+ if (specificOffset != null) {
+ byte[] serializedOffset =
+ DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+ restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+ LOG.info(
+ "Consumer subtask {} starts to read from specified offset {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredOffsetState);
+ } else {
+ LOG.info(
+ "Consumer subtask {} has no restore state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+ }
+
+ private void restoreOffsetState() throws Exception {
+ for (byte[] serializedOffset : offsetState.get()) {
+ if (restoredOffsetState == null) {
+ restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+ } else {
+ throw new RuntimeException(
+ "Debezium Source only support single task, "
+ + "however, this is restored from multiple tasks.");
+ }
+ }
+ LOG.info(
+ "Consumer subtask {} restored offset state: {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredOffsetState);
+ }
+
+ private void restoreHistoryRecordsState() throws Exception {
+ DocumentReader reader = DocumentReader.defaultReader();
+ ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+ int recordsCount = 0;
+ boolean firstEntry = true;
+ for (String record : schemaRecordsState.get()) {
+ if (firstEntry) {
+ // we store the engine instance name in the first element
+ this.engineInstanceName = record;
+ firstEntry = false;
+ } else {
+ // Put the records into the state. The database history should read, reorganize and
+ // register the state.
+ historyRecords.add(new SchemaRecord(reader.read(record)));
+ recordsCount++;
+ }
+ }
+ if (engineInstanceName != null) {
+ registerHistory(engineInstanceName, historyRecords);
+ }
+ LOG.info(
+ "Consumer subtask {} restored history records state: {} with {} records.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ engineInstanceName,
+ recordsCount);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ if (handover.hasError()) {
+ LOG.debug("snapshotState() called on closed source");
+ throw new FlinkRuntimeException(
+ "Call snapshotState() on closed source, checkpoint failed.");
+ } else {
+ snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+ snapshotHistoryRecordsState();
+ }
+ }
+
+ private void snapshotOffsetState(long checkpointId) throws Exception {
+ offsetState.clear();
+
+ final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+ byte[] serializedOffset = null;
+ if (fetcher == null) {
+ // the fetcher has not yet been initialized, which means we need to return the
+ // originally restored offsets
+ if (restoredOffsetState != null) {
+ serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+ }
+ } else {
+ byte[] currentState = fetcher.snapshotCurrentState();
+ if (currentState == null && restoredOffsetState != null) {
+ // the fetcher has been initialized, but has not yet received any data,
+ // which means we need to return the originally restored offsets.
+ serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+ } else {
+ serializedOffset = currentState;
+ }
+ }
+
+ if (serializedOffset != null) {
+ offsetState.add(serializedOffset);
+ // the map cannot be asynchronously updated, because only one checkpoint call
+ // can happen on this function at a time: either snapshotState() or
+ // notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+ // truncate the map of pending offsets to commit, to prevent infinite growth
+ while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+ pendingOffsetsToCommit.remove(0);
+ }
+ }
+ }
+
+ private void snapshotHistoryRecordsState() throws Exception {
+ schemaRecordsState.clear();
+
+ if (engineInstanceName != null) {
+ schemaRecordsState.add(engineInstanceName);
+ Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+ DocumentWriter writer = DocumentWriter.defaultWriter();
+ for (SchemaRecord record : records) {
+ schemaRecordsState.add(writer.write(record.toDocument()));
+ }
+ }
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+ properties.setProperty("name", "engine");
+ properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
+ if (restoredOffsetState != null) {
+ // restored from state
+ properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
+ }
+ // DO NOT include schema change, e.g. DDL
+ properties.setProperty("include.schema.changes", "false");
+ // disable the offset flush totally
+ properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
+ // disable tombstones
+ properties.setProperty("tombstones.on.delete", "false");
+ if (engineInstanceName == null) {
+ // not restore from recovery
+ engineInstanceName = UUID.randomUUID().toString();
+ }
+ // history instance name to initialize FlinkDatabaseHistory
+ properties.setProperty(
+ FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
+ // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
+ // continue to read binlog
+ // see
+ // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+ // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+ properties.setProperty("database.history", determineDatabase().getCanonicalName());
+
+ // we have to filter out the heartbeat events, otherwise the deserializer will fail
+ String dbzHeartbeatPrefix =
+ properties.getProperty(
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+ this.debeziumChangeFetcher =
+ new DebeziumChangeFetcher<>(
+ sourceContext,
+ deserializer,
+ restoredOffsetState == null, // DB snapshot phase if restore state is null
+ dbzHeartbeatPrefix,
+ handover);
+
+ // create the engine with this configuration ...
+ this.engine =
+ DebeziumEngine.create(Connect.class)
+ .using(properties)
+ .notifying(changeConsumer)
+ .using(OffsetCommitPolicy.always())
+ .using(
+ (success, message, error) -> {
+ if (success) {
+ // Close the handover and prepare to exit.
+ handover.close();
+ } else {
+ handover.reportError(error);
+ }
+ })
+ .build();
+
+ // run the engine asynchronously
+ executor.execute(engine);
+ debeziumStarted = true;
+
+ // initialize metrics
+ // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+ final Method getMetricGroupMethod =
+ getRuntimeContext().getClass().getMethod("getMetricGroup");
+ getMetricGroupMethod.setAccessible(true);
+ final MetricGroup metricGroup =
+ (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+ metricGroup.gauge(
+ "currentFetchEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+ metricGroup.gauge(
+ "currentEmitEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+ metricGroup.gauge(
+ "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+
+ // start the real debezium consumer
+ debeziumChangeFetcher.runFetchLoop();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ if (!debeziumStarted) {
+ LOG.debug("notifyCheckpointComplete() called when engine is not started.");
+ return;
+ }
+
+ final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+ if (fetcher == null) {
+ LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+ return;
+ }
+
+ try {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn(
+ "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ checkpointId);
+ return;
+ }
+
+ byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (serializedOffsets == null || serializedOffsets.length == 0) {
+ LOG.debug(
+ "Consumer subtask {} has empty checkpoint state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ return;
+ }
+
+ DebeziumOffset offset =
+ DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+ changeConsumer.commitOffset(offset);
+ } catch (Exception e) {
+ // ignore exception if we are no longer running
+ LOG.warn("Ignore error when committing offset to database.", e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // safely and gracefully stop the engine
+ shutdownEngine();
+ if (debeziumChangeFetcher != null) {
+ debeziumChangeFetcher.close();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancel();
+
+ if (executor != null) {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
+
+ super.close();
+ }
+
+ /** Safely and gracefully stop the Debezium engine. */
+ private void shutdownEngine() {
+ try {
+ if (engine != null) {
+ engine.close();
+ }
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+
+ debeziumStarted = false;
+
+ if (handover != null) {
+ handover.close();
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ @VisibleForTesting
+ public LinkedMap getPendingOffsetsToCommit() {
+ return pendingOffsetsToCommit;
+ }
+
+ @VisibleForTesting
+ public boolean getDebeziumStarted() {
+ return debeziumStarted;
+ }
+
+ private Class<?> determineDatabase() {
+ boolean isCompatibleWithLegacy =
+ FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+ if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) {
+ // specifies the legacy implementation but the state may be incompatible
+ if (isCompatibleWithLegacy) {
+ return FlinkDatabaseHistory.class;
+ } else {
+ throw new IllegalStateException(
+ "The configured option 'debezium.internal.implementation' is 'legacy', "
+ + "but the state of source is incompatible with this implementation, "
+ + "you should remove the the option.");
+ }
+ } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+ // tries the non-legacy first
+ return FlinkDatabaseSchemaHistory.class;
+ } else if (isCompatibleWithLegacy) {
+ // fallback to legacy if possible
+ return FlinkDatabaseHistory.class;
+ } else {
+ // impossible
+ throw new IllegalStateException("Can't determine which DatabaseHistory to use.");
+ }
+ }
+
+ @VisibleForTesting
+ public String getEngineInstanceName() {
+ return engineInstanceName;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
new file mode 100644
index 000000000..1539a3570
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/JsonDebeziumDeserializationSchema.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
+/**
+ * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
+ * received {@link SourceRecord} to JSON String.
+ */
+public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient JsonConverter jsonConverter;
+
+ /**
+ * Configuration whether to enable {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
+ * schema in messages.
+ */
+ private final Boolean includeSchema;
+
+ /** The custom configurations for {@link JsonConverter}. */
+ private Map<String, Object> customConverterConfigs;
+
+ public JsonDebeziumDeserializationSchema() {
+ this(false);
+ }
+
+ public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
+ this.includeSchema = includeSchema;
+ }
+
+ public JsonDebeziumDeserializationSchema(
+ Boolean includeSchema, Map<String, Object> customConverterConfigs) {
+ this.includeSchema = includeSchema;
+ this.customConverterConfigs = customConverterConfigs;
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
+ if (jsonConverter == null) {
+ initializeJsonConverter();
+ }
+ byte[] bytes =
+ jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
+ out.collect(new String(bytes));
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
+ throws Exception {
+
+ }
+
+ /** Initialize {@link JsonConverter} with given configs. */
+ private void initializeJsonConverter() {
+ jsonConverter = new JsonConverter();
+ final HashMap<String, Object> configs = new HashMap<>(2);
+ configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
+ configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
+ if (customConverterConfigs != null) {
+ configs.putAll(customConverterConfigs);
+ }
+ jsonConverter.configure(configs);
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
new file mode 100644
index 000000000..1b7931abd
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/StringDebeziumDeserializationSchema.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * A simple implementation of {@link DebeziumDeserializationSchema} which converts the received
+ * {@link SourceRecord} into String.
+ */
+public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
+ private static final long serialVersionUID = -3168848963265670603L;
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
+ out.collect(record.toString());
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<String> out, TableChange tableChange)
+ throws Exception {
+
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/Validator.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/Validator.java
new file mode 100644
index 000000000..f39147673
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/Validator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium;
+
+import java.io.Serializable;
+
+/** Validator to validate the connected database satisfies the cdc connector's requirements. */
+public interface Validator extends Serializable {
+
+ void validate();
+
+ static Validator getDefaultValidator() {
+ return () -> {
+ // do nothing;
+ };
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
new file mode 100644
index 000000000..856d7fa83
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/history/FlinkJsonTableChangeSerializer.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.history;
+
+import io.debezium.document.Array;
+import io.debezium.document.Array.Entry;
+import io.debezium.document.Document;
+import io.debezium.document.Value;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.relational.history.TableChanges.TableChangeType;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * The serializer responsible for converting of {@link TableChanges} into a JSON format. Copied from
+ * io.debezium.relational.history.JsonTableChangeSerializer, but add serialization/deserialization
+ * for column's enumValues
+ */
+public class FlinkJsonTableChangeSerializer implements TableChanges.TableChangesSerializer<Array> {
+
+ @Override
+ public Array serialize(TableChanges tableChanges) {
+ List<Value> values =
+ StreamSupport.stream(tableChanges.spliterator(), false)
+ .map(this::toDocument)
+ .map(Value::create)
+ .collect(Collectors.toList());
+
+ return Array.create(values);
+ }
+
+ public Document toDocument(TableChange tableChange) {
+ Document document = Document.create();
+
+ document.setString("type", tableChange.getType().name());
+ document.setString("id", tableChange.getId().toDoubleQuotedString());
+ document.setDocument("table", toDocument(tableChange.getTable()));
+ return document;
+ }
+
+ private Document toDocument(Table table) {
+ Document document = Document.create();
+
+ document.set("defaultCharsetName", table.defaultCharsetName());
+ document.set("primaryKeyColumnNames", Array.create(table.primaryKeyColumnNames()));
+
+ List<Document> columns =
+ table.columns().stream().map(this::toDocument).collect(Collectors.toList());
+
+ document.setArray("columns", Array.create(columns));
+
+ return document;
+ }
+
+ private Document toDocument(Column column) {
+ Document document = Document.create();
+
+ document.setString("name", column.name());
+ document.setNumber("jdbcType", column.jdbcType());
+
+ if (column.nativeType() != Column.UNSET_INT_VALUE) {
+ document.setNumber("nativeType", column.nativeType());
+ }
+
+ document.setString("typeName", column.typeName());
+ document.setString("typeExpression", column.typeExpression());
+ document.setString("charsetName", column.charsetName());
+
+ if (column.length() != Column.UNSET_INT_VALUE) {
+ document.setNumber("length", column.length());
+ }
+
+ column.scale().ifPresent(s -> document.setNumber("scale", s));
+
+ document.setNumber("position", column.position());
+ document.setBoolean("optional", column.isOptional());
+ document.setBoolean("autoIncremented", column.isAutoIncremented());
+ document.setBoolean("generated", column.isGenerated());
+
+ // BEGIN FLINK MODIFICATION
+ document.setArray("enumValues", column.enumValues().toArray());
+ // END FLINK MODIFICATION
+
+ return document;
+ }
+
+ @Override
+ public TableChanges deserialize(Array array, boolean useCatalogBeforeSchema) {
+ TableChanges tableChanges = new TableChanges();
+
+ for (Entry entry : array) {
+ TableChange change =
+ fromDocument(entry.getValue().asDocument(), useCatalogBeforeSchema);
+
+ if (change.getType() == TableChangeType.CREATE) {
+ tableChanges.create(change.getTable());
+ } else if (change.getType() == TableChangeType.ALTER) {
+ tableChanges.alter(change.getTable());
+ } else if (change.getType() == TableChangeType.DROP) {
+ tableChanges.drop(change.getTable());
+ }
+ }
+
+ return tableChanges;
+ }
+
+ private static Table fromDocument(TableId id, Document document) {
+ TableEditor editor =
+ Table.editor()
+ .tableId(id)
+ .setDefaultCharsetName(document.getString("defaultCharsetName"));
+
+ document.getArray("columns")
+ .streamValues()
+ .map(Value::asDocument)
+ .map(
+ v -> {
+ ColumnEditor columnEditor =
+ Column.editor()
+ .name(v.getString("name"))
+ .jdbcType(v.getInteger("jdbcType"));
+
+ Integer nativeType = v.getInteger("nativeType");
+ if (nativeType != null) {
+ columnEditor.nativeType(nativeType);
+ }
+
+ columnEditor
+ .type(v.getString("typeName"), v.getString("typeExpression"))
+ .charsetName(v.getString("charsetName"));
+
+ Integer length = v.getInteger("length");
+ if (length != null) {
+ columnEditor.length(length);
+ }
+
+ Integer scale = v.getInteger("scale");
+ if (scale != null) {
+ columnEditor.scale(scale);
+ }
+
+ columnEditor
+ .position(v.getInteger("position"))
+ .optional(v.getBoolean("optional"))
+ .autoIncremented(v.getBoolean("autoIncremented"))
+ .generated(v.getBoolean("generated"));
+
+ // BEGIN FLINK MODIFICATION
+ Array enumValues = v.getArray("enumValues");
+ if (enumValues != null && !enumValues.isEmpty()) {
+ columnEditor.enumValues(
+ enumValues
+ .streamValues()
+ .map(Value::asString)
+ .collect(Collectors.toList()));
+ }
+ // END FLINK MODIFICATION
+
+ return columnEditor.create();
+ })
+ .forEach(editor::addColumn);
+
+ editor.setPrimaryKeyNames(
+ document.getArray("primaryKeyColumnNames")
+ .streamValues()
+ .map(Value::asString)
+ .collect(Collectors.toList()));
+
+ return editor.create();
+ }
+
+ public static TableChange fromDocument(Document document, boolean useCatalogBeforeSchema) {
+ TableChangeType type = TableChangeType.valueOf(document.getString("type"));
+ TableId id = TableId.parse(document.getString("id"), useCatalogBeforeSchema);
+ Table table = null;
+
+ if (type == TableChangeType.CREATE || type == TableChangeType.ALTER) {
+ table = fromDocument(id, document.getDocument("table"));
+ } else {
+ table = Table.editor().tableId(id).create();
+ }
+ return new TableChange(type, table);
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
new file mode 100644
index 000000000..bb31eadec
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeConsumer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import io.debezium.embedded.EmbeddedEngineChangeEvent;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Consume debezium change events. */
+@Internal
+public class DebeziumChangeConsumer
+ implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
+ public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
+ public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
+ private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
+
+ private final Handover handover;
+ // keep the modification is visible to the source function
+ private volatile RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;
+
+ public DebeziumChangeConsumer(Handover handover) {
+ this.handover = handover;
+ }
+
+ @Override
+ public void handleBatch(
+ List<ChangeEvent<SourceRecord, SourceRecord>> events,
+ RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
+ try {
+ currentCommitter = recordCommitter;
+ handover.produce(events);
+ } catch (Throwable e) {
+ // Hold this exception in handover and trigger the fetcher to exit
+ handover.reportError(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void commitOffset(DebeziumOffset offset) throws InterruptedException {
+ // Although the committer is read/write by multi-thread, the committer will be not changed
+ // frequently.
+ if (currentCommitter == null) {
+ LOG.info(
+ "commitOffset() called on Debezium change consumer which doesn't receive records yet.");
+ return;
+ }
+
+ // only the offset is used
+ SourceRecord recordWrapper =
+ new SourceRecord(
+ offset.sourcePartition,
+ adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
+ "DUMMY",
+ Schema.BOOLEAN_SCHEMA,
+ true);
+ EmbeddedEngineChangeEvent<SourceRecord, SourceRecord> changeEvent =
+ new EmbeddedEngineChangeEvent<>(null, recordWrapper, recordWrapper);
+ currentCommitter.markProcessed(changeEvent);
+ currentCommitter.markBatchFinished();
+ }
+
+ /**
+ * We have to adjust type of LSN values to Long, because it might be Integer after
+ * deserialization, however {@code
+ * io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(java.util.Map)}
+ * requires Long.
+ */
+ private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
+ if (sourceOffset.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
+ String value = sourceOffset.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString();
+ sourceOffset.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
+ }
+ if (sourceOffset.containsKey(LAST_COMMIT_LSN_KEY)) {
+ String value = sourceOffset.get(LAST_COMMIT_LSN_KEY).toString();
+ sourceOffset.put(LAST_COMMIT_LSN_KEY, Long.parseLong(value));
+ }
+ return sourceOffset;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
new file mode 100644
index 000000000..ea44e7026
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumChangeFetcher.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering
+ * Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler
+ * also needs different strategy. In snapshot phase, the handler needs to hold the lock until the
+ * snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when
+ * emitting the records.
+ *
+ * @param <T> The type of elements produced by the handler.
+ */
+@Internal
+public class DebeziumChangeFetcher<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class);
+
+ private final SourceFunction.SourceContext<T> sourceContext;
+
+ /**
+ * The lock that guarantees that record emission and state updates are atomic, from the view of
+ * taking a checkpoint.
+ */
+ private final Object checkpointLock;
+
+ /** The schema to convert from Debezium's messages into Flink's objects. */
+ private final DebeziumDeserializationSchema<T> deserialization;
+
+ /** A collector to emit records in batch (bundle). */
+ private final DebeziumCollector debeziumCollector;
+
+ private final DebeziumOffset debeziumOffset;
+
+ private final DebeziumOffsetSerializer stateSerializer;
+
+ private final String heartbeatTopicPrefix;
+
+ private boolean isInDbSnapshotPhase;
+
+ private final Handover handover;
+
+ private volatile boolean isRunning = true;
+
+ // ---------------------------------------------------------------------------------------
+ // Metrics
+ // ---------------------------------------------------------------------------------------
+
+ /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
+ private volatile long messageTimestamp = 0L;
+
+ /** The last record processing time. */
+ private volatile long processTime = 0L;
+
+ /**
+ * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
+ * record fetched into the source operator.
+ */
+ private volatile long fetchDelay = 0L;
+
+ /**
+ * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
+ * source operator.
+ */
+ private volatile long emitDelay = 0L;
+
+ // ------------------------------------------------------------------------
+
+ public DebeziumChangeFetcher(
+ SourceFunction.SourceContext<T> sourceContext,
+ DebeziumDeserializationSchema<T> deserialization,
+ boolean isInDbSnapshotPhase,
+ String heartbeatTopicPrefix,
+ Handover handover) {
+ this.sourceContext = sourceContext;
+ this.checkpointLock = sourceContext.getCheckpointLock();
+ this.deserialization = deserialization;
+ this.isInDbSnapshotPhase = isInDbSnapshotPhase;
+ this.heartbeatTopicPrefix = heartbeatTopicPrefix;
+ this.debeziumCollector = new DebeziumCollector();
+ this.debeziumOffset = new DebeziumOffset();
+ this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
+ this.handover = handover;
+ }
+
+ /**
+ * Take a snapshot of the Debezium handler state.
+ *
+ * <p>Important: This method must be called under the checkpoint lock.
+ */
+ public byte[] snapshotCurrentState() throws Exception {
+ // this method assumes that the checkpoint lock is held
+ assert Thread.holdsLock(checkpointLock);
+ if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
+ return null;
+ }
+
+ return stateSerializer.serialize(debeziumOffset);
+ }
+
+ /**
+ * Process change messages from the {@link Handover} and collect the processed messages by
+ * {@link Collector}.
+ */
+ public void runFetchLoop() throws Exception {
+ try {
+ // begin snapshot database phase
+ if (isInDbSnapshotPhase) {
+ List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
+
+ synchronized (checkpointLock) {
+ LOG.info(
+ "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
+ handleBatch(events);
+ while (isRunning && isInDbSnapshotPhase) {
+ handleBatch(handover.pollNext());
+ }
+ }
+ LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
+ }
+
+ // begin streaming binlog phase
+ while (isRunning) {
+ // If the handover is closed or has errors, exit.
+ // If there is no streaming phase, the handover will be closed by the engine.
+ handleBatch(handover.pollNext());
+ }
+ } catch (Handover.ClosedException e) {
+ // ignore
+ }
+ }
+
+ public void close() {
+ isRunning = false;
+ handover.close();
+ }
+
+ // ---------------------------------------------------------------------------------------
+ // Metric getter
+ // ---------------------------------------------------------------------------------------
+
+ /**
+ * The metric indicates delay from data generation to entry into the system.
+ *
+ * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
+ * unavailable.
+ */
+ public long getFetchDelay() {
+ return fetchDelay;
+ }
+
+ /**
+ * The metric indicates delay from data generation to leaving the source operator.
+ *
+ * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is
+ * unavailable.
+ */
+ public long getEmitDelay() {
+ return emitDelay;
+ }
+
+ public long getIdleTime() {
+ return System.currentTimeMillis() - processTime;
+ }
+
+ // ---------------------------------------------------------------------------------------
+ // Helper
+ // ---------------------------------------------------------------------------------------
+
+ private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
+ throws Exception {
+ if (CollectionUtils.isEmpty(changeEvents)) {
+ return;
+ }
+ this.processTime = System.currentTimeMillis();
+
+ for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
+ SourceRecord record = event.value();
+ updateMessageTimestamp(record);
+ fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp;
+
+ if (isHeartbeatEvent(record)) {
+ // keep offset update
+ synchronized (checkpointLock) {
+ debeziumOffset.setSourcePartition(record.sourcePartition());
+ debeziumOffset.setSourceOffset(record.sourceOffset());
+ }
+ // drop heartbeat events
+ continue;
+ }
+
+ deserialization.deserialize(record, debeziumCollector);
+
+ if (!isSnapshotRecord(record)) {
+ LOG.debug("Snapshot phase finishes.");
+ isInDbSnapshotPhase = false;
+ }
+
+ // emit the actual records. this also updates offset state atomically
+ emitRecordsUnderCheckpointLock(
+ debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
+ }
+ }
+
+ private void emitRecordsUnderCheckpointLock(
+ Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
+ // Emit the records. Use the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update.
+ // The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode.
+ synchronized (checkpointLock) {
+ T record;
+ while ((record = records.poll()) != null) {
+ emitDelay =
+ isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp;
+ sourceContext.collect(record);
+ }
+ // update offset to state
+ debeziumOffset.setSourcePartition(sourcePartition);
+ debeziumOffset.setSourceOffset(sourceOffset);
+ }
+ }
+
+ private void updateMessageTimestamp(SourceRecord record) {
+ Schema schema = record.valueSchema();
+ Struct value = (Struct) record.value();
+ if (schema.field(Envelope.FieldName.SOURCE) == null) {
+ return;
+ }
+
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
+ return;
+ }
+
+ Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
+ if (tsMs != null) {
+ this.messageTimestamp = tsMs;
+ }
+ }
+
+ private boolean isHeartbeatEvent(SourceRecord record) {
+ String topic = record.topic();
+ return topic != null && topic.startsWith(heartbeatTopicPrefix);
+ }
+
+ private boolean isSnapshotRecord(SourceRecord record) {
+ Struct value = (Struct) record.value();
+ if (value != null) {
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
+ // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
+ // we can still recover from checkpoint and continue to read the binlog,
+ // because the checkpoint contains binlog position
+ return SnapshotRecord.TRUE == snapshotRecord;
+ }
+ return false;
+ }
+
+ // ---------------------------------------------------------------------------------------
+
+ private class DebeziumCollector implements Collector<T> {
+
+ private final Queue<T> records = new ArrayDeque<>();
+
+ @Override
+ public void collect(T record) {
+ records.add(record);
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
new file mode 100644
index 000000000..a51aad8d4
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffset.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The state that the Flink Debezium Consumer holds for each instance.
+ *
+ * <p>This class describes the most basic state that Debezium used for recovering based on Kafka
+ * Connect mechanism. It includes a sourcePartition and sourceOffset.
+ *
+ * <p>The sourcePartition represents a single input sourcePartition that the record came from (e.g.
+ * a filename, table name, or topic-partition). The sourceOffset represents a position in that
+ * sourcePartition which can be used to resume consumption of data.
+ *
+ * <p>These values can have arbitrary structure and should be represented using
+ * org.apache.kafka.connect.data objects (or primitive values). For example, a database connector
+ * might specify the sourcePartition as a record containing { "db": "database_name", "table":
+ * "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
+ */
+@Internal
+public class DebeziumOffset implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public Map<String, ?> sourcePartition;
+ public Map<String, ?> sourceOffset;
+
+ public void setSourcePartition(Map<String, ?> sourcePartition) {
+ this.sourcePartition = sourcePartition;
+ }
+
+ public void setSourceOffset(Map<String, ?> sourceOffset) {
+ this.sourceOffset = sourceOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "DebeziumOffset{"
+ + "sourcePartition="
+ + sourcePartition
+ + ", sourceOffset="
+ + sourceOffset
+ + '}';
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
new file mode 100644
index 000000000..9f193a8d9
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/DebeziumOffsetSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** Serializer implementation for a {@link DebeziumOffset}. */
+@Internal
+public class DebeziumOffsetSerializer {
+ public static final DebeziumOffsetSerializer INSTANCE = new DebeziumOffsetSerializer();
+
+ public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException {
+ // we currently use JSON serialization for simplification, as the state is very small.
+ // we can improve this in the future if needed
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.writeValueAsBytes(debeziumOffset);
+ }
+
+ public DebeziumOffset deserialize(byte[] bytes) throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readValue(bytes, DebeziumOffset.class);
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
new file mode 100644
index 000000000..77315db48
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseHistory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+/**
+ * Inspired from {@link io.debezium.relational.history.MemoryDatabaseHistory} but we will store the
+ * HistoryRecords in Flink's state for persistence.
+ *
+ * <p>Note: This is not a clean solution because we depends on a global variable and all the history
+ * records will be stored in state (grow infinitely). We may need to come up with a
+ * FileSystemDatabaseHistory in the future to store history in HDFS.
+ */
+public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
+
+ public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
+
+ private ConcurrentLinkedQueue<SchemaRecord> schemaRecords;
+ private String instanceName;
+
+ /** Gets the registered HistoryRecords under the given instance name. */
+ private ConcurrentLinkedQueue<SchemaRecord> getRegisteredHistoryRecord(String instanceName) {
+ Collection<SchemaRecord> historyRecords = retrieveHistory(instanceName);
+ return new ConcurrentLinkedQueue<>(historyRecords);
+ }
+
+ @Override
+ public void configure(
+ Configuration config,
+ HistoryRecordComparator comparator,
+ DatabaseHistoryListener listener,
+ boolean useCatalogBeforeSchema) {
+ super.configure(config, comparator, listener, useCatalogBeforeSchema);
+ this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
+ this.schemaRecords = getRegisteredHistoryRecord(instanceName);
+
+ // register the schema changes into state
+ // every change should be visible to the source function
+ registerHistory(instanceName, schemaRecords);
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ removeHistory(instanceName);
+ }
+
+ @Override
+ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+ this.schemaRecords.add(new SchemaRecord(record));
+ }
+
+ @Override
+ protected void recoverRecords(Consumer<HistoryRecord> records) {
+ this.schemaRecords.stream().map(SchemaRecord::getHistoryRecord).forEach(records);
+ }
+
+ @Override
+ public boolean exists() {
+ return !schemaRecords.isEmpty();
+ }
+
+ @Override
+ public boolean storageExists() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "Flink Database History";
+ }
+
+ /**
+ * Determine whether the {@link FlinkDatabaseHistory} is compatible with the specified state.
+ */
+ public static boolean isCompatible(Collection<SchemaRecord> records) {
+ for (SchemaRecord record : records) {
+ // check the source/position/ddl is not null
+ if (!record.isHistoryRecord()) {
+ return false;
+ } else {
+ break;
+ }
+ }
+ return true;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
new file mode 100644
index 000000000..1027b18ed
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkDatabaseSchemaHistory.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.removeHistory;
+import static org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static io.debezium.relational.history.TableChanges.TableChange;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import io.debezium.config.Configuration;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.schema.DatabaseSchema;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * The {@link FlinkDatabaseSchemaHistory} only stores the latest schema of the monitored tables.
+ * When recovering from the checkpoint, it should apply all the tables to the {@link
+ * DatabaseSchema}, which doesn't need to replay the history anymore.
+ *
+ * <p>Considering the data structure maintained in the {@link FlinkDatabaseSchemaHistory} is much
+ * different from the {@link FlinkDatabaseHistory}, it's not compatible with the {@link
+ * FlinkDatabaseHistory}. Because it only maintains the latest schema of the table rather than all
+ * history DDLs, it's useful to prevent OOM when meet massive history DDLs.
+ */
+public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
+
+ public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
+
+ private final FlinkJsonTableChangeSerializer tableChangesSerializer =
+ new FlinkJsonTableChangeSerializer();
+
+ private ConcurrentMap<TableId, SchemaRecord> latestTables;
+ private String instanceName;
+ private DatabaseHistoryListener listener;
+ private boolean storeOnlyMonitoredTablesDdl;
+ private boolean skipUnparseableDDL;
+ private boolean useCatalogBeforeSchema;
+
+ @Override
+ public void configure(
+ Configuration config,
+ HistoryRecordComparator comparator,
+ DatabaseHistoryListener listener,
+ boolean useCatalogBeforeSchema) {
+ this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
+ this.listener = listener;
+ this.storeOnlyMonitoredTablesDdl = config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
+ this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
+ this.useCatalogBeforeSchema = useCatalogBeforeSchema;
+
+ // recover
+ this.latestTables = new ConcurrentHashMap<>();
+ for (SchemaRecord schemaRecord : retrieveHistory(instanceName)) {
+ // validate here
+ TableChange tableChange =
+ FlinkJsonTableChangeSerializer.fromDocument(
+ schemaRecord.toDocument(), useCatalogBeforeSchema);
+ latestTables.put(tableChange.getId(), schemaRecord);
+ }
+ // register
+ registerHistory(instanceName, latestTables.values());
+ }
+
+ @Override
+ public void start() {
+ listener.started();
+ }
+
+ @Override
+ public void record(
+ Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl)
+ throws DatabaseHistoryException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The %s cannot work with 'debezium.internal.implementation' = 'legacy',"
+ + "please use %s",
+ FlinkDatabaseSchemaHistory.class.getCanonicalName(),
+ FlinkDatabaseHistory.class.getCanonicalName()));
+ }
+
+ @Override
+ public void record(
+ Map<String, ?> source,
+ Map<String, ?> position,
+ String databaseName,
+ String schemaName,
+ String ddl,
+ TableChanges changes)
+ throws DatabaseHistoryException {
+ for (TableChanges.TableChange change : changes) {
+ switch (change.getType()) {
+ case CREATE:
+ case ALTER:
+ latestTables.put(
+ change.getId(),
+ new SchemaRecord(tableChangesSerializer.toDocument(change)));
+ break;
+ case DROP:
+ latestTables.remove(change.getId());
+ break;
+ default:
+ // impossible
+ throw new RuntimeException(
+ String.format("Unknown change type: %s.", change.getType()));
+ }
+ }
+ listener.onChangeApplied(
+ new HistoryRecord(source, position, databaseName, schemaName, ddl, changes));
+ }
+
+ @Override
+ public void recover(
+ Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
+ listener.recoveryStarted();
+ for (SchemaRecord record : latestTables.values()) {
+ TableChange tableChange =
+ FlinkJsonTableChangeSerializer.fromDocument(
+ record.getTableChangeDoc(), useCatalogBeforeSchema);
+ schema.overwriteTable(tableChange.getTable());
+ }
+ listener.recoveryStopped();
+ }
+
+ @Override
+ public void stop() {
+ if (instanceName != null) {
+ removeHistory(instanceName);
+ }
+ listener.stopped();
+ }
+
+ @Override
+ public boolean exists() {
+ return latestTables != null && !latestTables.isEmpty();
+ }
+
+ @Override
+ public boolean storageExists() {
+ return true;
+ }
+
+ @Override
+ public void initializeStorage() {
+ // do nothing
+ }
+
+ @Override
+ public boolean storeOnlyMonitoredTables() {
+ return storeOnlyMonitoredTablesDdl;
+ }
+
+ @Override
+ public boolean skipUnparseableDdlStatements() {
+ return skipUnparseableDDL;
+ }
+
+ /**
+ * Determine whether the {@link FlinkDatabaseSchemaHistory} is compatible with the specified
+ * state.
+ */
+ public static boolean isCompatible(Collection<SchemaRecord> records) {
+ for (SchemaRecord record : records) {
+ if (!record.isTableChangeRecord()) {
+ return false;
+ } else {
+ break;
+ }
+ }
+ return true;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
new file mode 100644
index 000000000..d36516c29
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/FlinkOffsetBackingStore.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+import io.debezium.embedded.EmbeddedEngine;
+import io.debezium.engine.DebeziumEngine;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A implementation of {@link OffsetBackingStore} backed on Flink's state mechanism.
+ *
+ * <p>The {@link #OFFSET_STATE_VALUE} in the {@link WorkerConfig} is the raw position and offset
+ * data in JSON format. It is set into the config when recovery from failover by {@link
+ * DebeziumSourceFunction} before startup the {@link DebeziumEngine}. If it is not a restoration,
+ * the {@link #OFFSET_STATE_VALUE} is empty. {@link DebeziumEngine} relies on the {@link
+ * OffsetBackingStore} for failover recovery.
+ *
+ * @see DebeziumSourceFunction
+ */
+public class FlinkOffsetBackingStore implements OffsetBackingStore {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkOffsetBackingStore.class);
+
+ public static final String OFFSET_STATE_VALUE = "offset.storage.flink.state.value";
+ public static final int FLUSH_TIMEOUT_SECONDS = 10;
+
+ protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
+ protected ExecutorService executor;
+
+ @Override
+ public void configure(WorkerConfig config) {
+ // eagerly initialize the executor, because OffsetStorageWriter will use it later
+ start();
+
+ Map<String, ?> conf = config.originals();
+ if (!conf.containsKey(OFFSET_STATE_VALUE)) {
+ // a normal startup from clean state, not need to initialize the offset
+ return;
+ }
+
+ String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
+ DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
+ DebeziumOffset debeziumOffset;
+ try {
+ debeziumOffset = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e);
+ throw new RuntimeException(e);
+ }
+
+ final String engineName = (String) conf.get(EmbeddedEngine.ENGINE_NAME.name());
+ Converter keyConverter = new JsonConverter();
+ Converter valueConverter = new JsonConverter();
+ keyConverter.configure(config.originals(), true);
+ Map<String, Object> valueConfigs = new HashMap<>(conf);
+ valueConfigs.put("schemas.enable", false);
+ valueConverter.configure(valueConfigs, true);
+ OffsetStorageWriter offsetWriter =
+ new OffsetStorageWriter(
+ this,
+ // must use engineName as namespace to align with Debezium Engine
+ // implementation
+ engineName,
+ keyConverter,
+ valueConverter);
+
+ offsetWriter.offset(debeziumOffset.sourcePartition, debeziumOffset.sourceOffset);
+
+ // flush immediately
+ if (!offsetWriter.beginFlush()) {
+ // if nothing is needed to be flushed, there must be something wrong with the
+ // initialization
+ LOG.warn(
+ "Initialize FlinkOffsetBackingStore from empty offset state, this shouldn't happen.");
+ return;
+ }
+
+ // trigger flushing
+ Future<Void> flushFuture =
+ offsetWriter.doFlush(
+ (error, result) -> {
+ if (error != null) {
+ LOG.error("Failed to flush initial offset.", error);
+ } else {
+ LOG.debug("Successfully flush initial offset.");
+ }
+ });
+
+ // wait until flushing finished
+ try {
+ flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ LOG.info(
+ "Flush offsets successfully, partition: {}, offsets: {}",
+ debeziumOffset.sourcePartition,
+ debeziumOffset.sourceOffset);
+ } catch (InterruptedException e) {
+ LOG.warn("Flush offsets interrupted, cancelling.", e);
+ offsetWriter.cancelFlush();
+ } catch (ExecutionException e) {
+ LOG.error("Flush offsets threw an unexpected exception.", e);
+ offsetWriter.cancelFlush();
+ } catch (TimeoutException e) {
+ LOG.error("Timed out waiting to flush offsets to storage.", e);
+ offsetWriter.cancelFlush();
+ }
+ }
+
+ @Override
+ public void start() {
+ if (executor == null) {
+ executor =
+ Executors.newFixedThreadPool(
+ 1,
+ ThreadUtils.createThreadFactory(
+ this.getClass().getSimpleName() + "-%d", false));
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (executor != null) {
+ executor.shutdown();
+ // Best effort wait for any get() and set() tasks (and caller's callbacks) to complete.
+ try {
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (!executor.shutdownNow().isEmpty()) {
+ throw new ConnectException(
+ "Failed to stop FlinkOffsetBackingStore. Exiting without cleanly "
+ + "shutting down pending tasks and/or callbacks.");
+ }
+ executor = null;
+ }
+ }
+
+ @Override
+ public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
+ return executor.submit(
+ () -> {
+ Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
+ for (ByteBuffer key : keys) {
+ result.put(key, data.get(key));
+ }
+ return result;
+ });
+ }
+
+ @Override
+ public Future<Void> set(
+ final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
+ return executor.submit(
+ () -> {
+ for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
+ data.put(entry.getKey(), entry.getValue());
+ }
+ if (callback != null) {
+ callback.onCompletion(null, null);
+ }
+ return null;
+ });
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
new file mode 100644
index 000000000..9af2ff246
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/Handover.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import io.debezium.engine.ChangeEvent;
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and exception from a
+ * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a "size one
+ * blocking queue", with some extras around exception reporting, closing, and waking up thread
+ * without {@link Thread#interrupt() interrupting} threads.
+ *
+ * <p>This class is used in the Flink Debezium Engine Consumer to hand over data and exceptions
+ * between the thread that runs the DebeziumEngine class and the main thread.
+ *
+ * <p>The Handover can also be "closed", signalling from one thread to the other that it the thread
+ * has terminated.
+ */
+@ThreadSafe
+@Internal
+public class Handover implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private List<ChangeEvent<SourceRecord, SourceRecord>> next;
+
+ @GuardedBy("lock")
+ private Throwable error;
+
+ private boolean wakeupProducer;
+
+ /**
+ * Polls the next element from the Handover, possibly blocking until the next element is
+ * available. This method behaves similar to polling from a blocking queue.
+ *
+ * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then that
+ * exception is thrown rather than an element being returned.
+ *
+ * @return The next element (buffer of records, never null).
+ * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
+ * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
+ */
+ public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
+ synchronized (lock) {
+ while (next == null && error == null) {
+ lock.wait();
+ }
+ List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
+ if (n != null) {
+ next = null;
+ lock.notifyAll();
+ return n;
+ } else {
+ ExceptionUtils.rethrowException(error, error.getMessage());
+
+ // this statement cannot be reached since the above method always throws an
+ // exception this is only here to silence the compiler and any warnings
+ return Collections.emptyList();
+ }
+ }
+ }
+
+ /**
+ * Hands over an element from the producer. If the Handover already has an element that was not
+ * yet picked up by the consumer thread, this call blocks until the consumer picks up that
+ * previous element.
+ *
+ * <p>This behavior is similar to a "size one" blocking queue.
+ *
+ * @param element The next element to hand over.
+ * @throws InterruptedException Thrown, if the thread is interrupted while blocking for the
+ * Handover to be empty.
+ */
+ public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
+ throws InterruptedException {
+
+ checkNotNull(element);
+
+ synchronized (lock) {
+ while (next != null && !wakeupProducer) {
+ lock.wait();
+ }
+
+ wakeupProducer = false;
+
+ // an error marks this as closed for the producer
+ if (error != null) {
+ ExceptionUtils.rethrow(error, error.getMessage());
+ } else {
+ // if there is no error, then this is open and can accept this element
+ next = element;
+ lock.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Reports an exception. The consumer will throw the given exception immediately, if it is
+ * currently blocked in the {@link #pollNext()} method, or the next time it calls that method.
+ *
+ * <p>After this method has been called, no call to either {@link #produce( List)} or {@link
+ * #pollNext()} will ever return regularly any more, but will always return exceptionally.
+ *
+ * <p>If another exception was already reported, this method does nothing.
+ *
+ * <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
+ *
+ * @param t The exception to report.
+ */
+ public void reportError(Throwable t) {
+ checkNotNull(t);
+
+ synchronized (lock) {
+ LOG.error("Reporting error:", t);
+ // do not override the initial exception
+ if (error == null) {
+ error = t;
+ }
+ next = null;
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Return whether there is an error.
+ *
+ * @return whether there is an error
+ */
+ public boolean hasError() {
+ return error != null;
+ }
+
+ /**
+ * Closes the handover. Both the {@link #produce(List)} method and the {@link #pollNext()} will
+ * throw a {@link ClosedException} on any currently blocking and future invocations.
+ *
+ * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
+ * that exception will not be overridden. The consumer thread will throw that exception upon
+ * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+ */
+ @Override
+ public void close() {
+ synchronized (lock) {
+ next = null;
+ wakeupProducer = false;
+
+ if (error == null) {
+ error = new ClosedException();
+ }
+ lock.notifyAll();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * An exception thrown by the Handover in the {@link #pollNext()} or {@link #produce(List)}
+ * method, after the Handover was closed via {@link #close()}.
+ */
+ public static final class ClosedException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
new file mode 100644
index 000000000..bcc48ce52
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/internal/SchemaRecord.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal;
+
+import io.debezium.document.Document;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * The Record represents a schema change event, it contains either one {@link HistoryRecord} or
+ * {@link TableChange}.
+ *
+ * <p>The {@link HistoryRecord} will be used by {@link FlinkDatabaseHistory} which keeps full
+ * history of table change events for all tables, the {@link TableChange} will be used by {@link
+ * FlinkDatabaseSchemaHistory} which keeps the latest table change for each table.
+ */
+public class SchemaRecord {
+
+ @Nullable private final HistoryRecord historyRecord;
+
+ @Nullable private final Document tableChangeDoc;
+
+ public SchemaRecord(HistoryRecord historyRecord) {
+ this.historyRecord = historyRecord;
+ this.tableChangeDoc = null;
+ }
+
+ public SchemaRecord(Document document) {
+ if (isHistoryRecordDocument(document)) {
+ this.historyRecord = new HistoryRecord(document);
+ this.tableChangeDoc = null;
+ } else {
+ this.tableChangeDoc = document;
+ this.historyRecord = null;
+ }
+ }
+
+ @Nullable
+ public HistoryRecord getHistoryRecord() {
+ return historyRecord;
+ }
+
+ @Nullable
+ public Document getTableChangeDoc() {
+ return tableChangeDoc;
+ }
+
+ public boolean isHistoryRecord() {
+ return historyRecord != null;
+ }
+
+ public boolean isTableChangeRecord() {
+ return tableChangeDoc != null;
+ }
+
+ public Document toDocument() {
+ if (historyRecord != null) {
+ return historyRecord.document();
+ } else {
+ return tableChangeDoc;
+ }
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return DocumentWriter.defaultWriter().write(toDocument());
+ } catch (IOException e) {
+ return super.toString();
+ }
+ }
+
+ private boolean isHistoryRecordDocument(Document document) {
+ return new HistoryRecord(document).isValid();
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
new file mode 100644
index 000000000..50430463c
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.Serializable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** Emits a row with physical fields and metadata fields. */
+@Internal
+public final class AppendMetadataCollector implements Collector<RowData>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MetadataConverter[] metadataConverters;
+
+ public transient SourceRecord inputRecord;
+ public transient Collector<RowData> outputCollector;
+
+ public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
+ this.metadataConverters = metadataConverters;
+ }
+
+ public void collect(RowData physicalRow, TableChange tableSchema) {
+ GenericRowData metaRow = new GenericRowData(metadataConverters.length);
+ for (int i = 0; i < metadataConverters.length; i++) {
+ Object meta = metadataConverters[i].read(inputRecord, tableSchema);
+ metaRow.setField(i, meta);
+ }
+ RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
+ outputCollector.collect(outRow);
+ }
+
+ @Override
+ public void collect(RowData record) {
+
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DebeziumOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DebeziumOptions.java
new file mode 100644
index 000000000..c40cfea3e
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DebeziumOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import java.util.Map;
+import java.util.Properties;
+
+/** Option utils for Debezium options. */
+public class DebeziumOptions {
+ public static final String DEBEZIUM_OPTIONS_PREFIX = "debezium.";
+
+ public static Properties getDebeziumProperties(Map<String, String> properties) {
+ final Properties debeziumProperties = new Properties();
+
+ if (hasDebeziumProperties(properties)) {
+ properties.keySet().stream()
+ .filter(key -> key.startsWith(DEBEZIUM_OPTIONS_PREFIX))
+ .forEach(
+ key -> {
+ final String value = properties.get(key);
+ final String subKey =
+ key.substring((DEBEZIUM_OPTIONS_PREFIX).length());
+ debeziumProperties.put(subKey, value);
+ });
+ }
+ return debeziumProperties;
+ }
+
+ /**
+ * Decides if the table options contains Debezium client properties that start with prefix
+ * 'debezium'.
+ */
+ private static boolean hasDebeziumProperties(Map<String, String> debeziumOptions) {
+ return debeziumOptions.keySet().stream()
+ .anyMatch(k -> k.startsWith(DEBEZIUM_OPTIONS_PREFIX));
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverter.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverter.java
new file mode 100644
index 000000000..0b4d47029
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import java.io.Serializable;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * Runtime converter that converts objects of Debezium into objects of Flink Table & SQL internal
+ * data structures.
+ */
+@FunctionalInterface
+public interface DeserializationRuntimeConverter extends Serializable {
+ Object convert(Object dbzObj, Schema schema) throws Exception;
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverterFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverterFactory.java
new file mode 100644
index 000000000..3d0861062
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/DeserializationRuntimeConverterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.Optional;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Factory to create {@link DeserializationRuntimeConverter} according to {@link LogicalType}. It's
+ * usually used to create a user-defined {@link DeserializationRuntimeConverter} which has a higher
+ * resolve order than default converter.
+ */
+public interface DeserializationRuntimeConverterFactory extends Serializable {
+
+ /** A user-defined converter factory which always fallback to default converters. */
+ DeserializationRuntimeConverterFactory DEFAULT =
+ (logicalType, serverTimeZone) -> Optional.empty();
+
+ /**
+ * Returns an optional {@link DeserializationRuntimeConverter}. Returns {@link Optional#empty()}
+ * if fallback to default converter.
+ *
+ * @param logicalType the Flink Table & SQL internal datatype to be converted from objects of
+ * Debezium
+ * @param serverTimeZone TimeZone used to convert data with timestamp type
+ */
+ Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
+ LogicalType logicalType, ZoneId serverTimeZone);
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/MetadataConverter.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/MetadataConverter.java
new file mode 100644
index 000000000..8c3470841
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/MetadataConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import io.debezium.relational.history.TableChanges;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A converter converts {@link SourceRecord} metadata into Flink internal data structures. */
+@FunctionalInterface
+@Internal
+public interface MetadataConverter extends Serializable {
+ Object read(SourceRecord record);
+
+ default Object read(SourceRecord record, @Nullable TableChanges.TableChange tableSchema) {
+ return read(record);
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
new file mode 100644
index 000000000..b5cea2aef
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.table;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.TemporalConversions;
+import io.debezium.data.Envelope;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link
+ * RowData}.
+ */
+public final class RowDataDebeziumDeserializeSchema
+ implements DebeziumDeserializationSchema<RowData> {
+ private static final long serialVersionUID = 2L;
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
+ deserialize(record, out, null);
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<RowData> out, TableChange tableChange)
+ throws Exception {
+ Envelope.Operation op = Envelope.operationFor(record);
+ Struct value = (Struct) record.value();
+ Schema valueSchema = record.valueSchema();
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ GenericRowData insert = extractAfterRow(value, valueSchema);
+ validator.validate(insert, RowKind.INSERT);
+ insert.setRowKind(RowKind.INSERT);
+ emit(record, insert, out, tableChange);
+ } else if (op == Envelope.Operation.DELETE) {
+ GenericRowData delete = extractBeforeRow(value, valueSchema);
+ validator.validate(delete, RowKind.DELETE);
+ delete.setRowKind(RowKind.DELETE);
+ emit(record, delete, out, tableChange);
+ } else {
+ GenericRowData before = extractBeforeRow(value, valueSchema);
+ validator.validate(before, RowKind.UPDATE_BEFORE);
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ emit(record, before, out, tableChange);
+
+ GenericRowData after = extractAfterRow(value, valueSchema);
+ validator.validate(after, RowKind.UPDATE_AFTER);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ emit(record, after, out, tableChange);
+ }
+ }
+
+ /** Custom validator to validate the row value. */
+ public interface ValueValidator extends Serializable {
+ void validate(RowData rowData, RowKind rowKind) throws Exception;
+ }
+
+ /** TypeInformation of the produced {@link RowData}. * */
+ private final TypeInformation<RowData> resultTypeInfo;
+
+ /**
+ * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
+ * physical column values.
+ */
+ private final DeserializationRuntimeConverter physicalConverter;
+
+ /** Whether the deserializer needs to handle metadata columns. */
+ private final boolean hasMetadata;
+
+ /**
+ * A wrapped output collector which is used to append metadata columns after physical columns.
+ */
+ private final AppendMetadataCollector appendMetadataCollector;
+
+ /** Validator to validate the row value. */
+ private final ValueValidator validator;
+
+ /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ RowDataDebeziumDeserializeSchema(
+ RowType physicalDataType,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> resultTypeInfo,
+ ValueValidator validator,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ this.hasMetadata = checkNotNull(metadataConverters).length > 0;
+ this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
+ this.physicalConverter =
+ createConverter(
+ checkNotNull(physicalDataType),
+ serverTimeZone,
+ userDefinedConverterFactory);
+ this.resultTypeInfo = checkNotNull(resultTypeInfo);
+ this.validator = checkNotNull(validator);
+ }
+
+ private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {
+ Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ return (GenericRowData) physicalConverter.convert(after, afterSchema);
+ }
+
+ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
+ Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
+ Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+ return (GenericRowData) physicalConverter.convert(before, beforeSchema);
+ }
+
+ private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector,
+ TableChange tableChange) {
+ if (!hasMetadata) {
+ collector.collect(physicalRow);
+ return;
+ }
+
+ appendMetadataCollector.inputRecord = inRecord;
+ appendMetadataCollector.outputCollector = collector;
+ appendMetadataCollector.collect(physicalRow, tableChange);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return resultTypeInfo;
+ }
+
+ // -------------------------------------------------------------------------------------
+ // Builder
+ // -------------------------------------------------------------------------------------
+
+ /** Builder of {@link RowDataDebeziumDeserializeSchema}. */
+ public static class Builder {
+ private RowType physicalRowType;
+ private TypeInformation<RowData> resultTypeInfo;
+ private MetadataConverter[] metadataConverters = new MetadataConverter[0];
+ private ValueValidator validator = (rowData, rowKind) -> {};
+ private ZoneId serverTimeZone = ZoneId.of("UTC");
+ private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
+ DeserializationRuntimeConverterFactory.DEFAULT;
+
+ public Builder setPhysicalRowType(RowType physicalRowType) {
+ this.physicalRowType = physicalRowType;
+ return this;
+ }
+
+ public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
+ this.metadataConverters = metadataConverters;
+ return this;
+ }
+
+ public Builder setResultTypeInfo(TypeInformation<RowData> resultTypeInfo) {
+ this.resultTypeInfo = resultTypeInfo;
+ return this;
+ }
+
+ public Builder setValueValidator(ValueValidator validator) {
+ this.validator = validator;
+ return this;
+ }
+
+ public Builder setServerTimeZone(ZoneId serverTimeZone) {
+ this.serverTimeZone = serverTimeZone;
+ return this;
+ }
+
+ public Builder setUserDefinedConverterFactory(
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ this.userDefinedConverterFactory = userDefinedConverterFactory;
+ return this;
+ }
+
+ public RowDataDebeziumDeserializeSchema build() {
+ return new RowDataDebeziumDeserializeSchema(
+ physicalRowType,
+ metadataConverters,
+ resultTypeInfo,
+ validator,
+ serverTimeZone,
+ userDefinedConverterFactory);
+ }
+ }
+
+ // -------------------------------------------------------------------------------------
+ // Runtime Converters
+ // -------------------------------------------------------------------------------------
+
+ /** Creates a runtime converter which is null safe. */
+ private static DeserializationRuntimeConverter createConverter(
+ LogicalType type,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ return wrapIntoNullableConverter(
+ createNotNullConverter(type, serverTimeZone, userDefinedConverterFactory));
+ }
+
+ // --------------------------------------------------------------------------------
+ // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+ // necessary because the maven shade plugin cannot relocate classes in
+ // SerializedLambdas (MSHADE-260).
+ // --------------------------------------------------------------------------------
+
+ /** Creates a runtime converter which assuming input object is not null. */
+ public static DeserializationRuntimeConverter createNotNullConverter(
+ LogicalType type,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ // user defined converter has a higher resolve order
+ Optional<DeserializationRuntimeConverter> converter =
+ userDefinedConverterFactory.createUserDefinedConverter(type, serverTimeZone);
+ if (converter.isPresent()) {
+ return converter.get();
+ }
+
+ // if no matched user defined converter, fallback to the default converter
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return null;
+ }
+ };
+ case BOOLEAN:
+ return convertToBoolean();
+ case TINYINT:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return Byte.parseByte(dbzObj.toString());
+ }
+ };
+ case SMALLINT:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return Short.parseShort(dbzObj.toString());
+ }
+ };
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return convertToInt();
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return convertToLong();
+ case DATE:
+ return convertToDate();
+ case TIME_WITHOUT_TIME_ZONE:
+ return convertToTime();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return convertToTimestamp(serverTimeZone);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return convertToLocalTimeZoneTimestamp(serverTimeZone);
+ case FLOAT:
+ return convertToFloat();
+ case DOUBLE:
+ return convertToDouble();
+ case CHAR:
+ case VARCHAR:
+ return convertToString();
+ case BINARY:
+ case VARBINARY:
+ return convertToBinary();
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ROW:
+ return createRowConverter(
+ (RowType) type, serverTimeZone, userDefinedConverterFactory);
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static DeserializationRuntimeConverter convertToBoolean() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Boolean) {
+ return dbzObj;
+ } else if (dbzObj instanceof Byte) {
+ return (byte) dbzObj == 1;
+ } else if (dbzObj instanceof Short) {
+ return (short) dbzObj == 1;
+ } else {
+ return Boolean.parseBoolean(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToInt() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return dbzObj;
+ } else if (dbzObj instanceof Long) {
+ return ((Long) dbzObj).intValue();
+ } else {
+ return Integer.parseInt(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToLong() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return ((Integer) dbzObj).longValue();
+ } else if (dbzObj instanceof Long) {
+ return dbzObj;
+ } else {
+ return Long.parseLong(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToDouble() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return ((Float) dbzObj).doubleValue();
+ } else if (dbzObj instanceof Double) {
+ return dbzObj;
+ } else {
+ return Double.parseDouble(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToFloat() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return dbzObj;
+ } else if (dbzObj instanceof Double) {
+ return ((Double) dbzObj).floatValue();
+ } else {
+ return Float.parseFloat(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToDate() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToTime() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case MicroTime.SCHEMA_NAME:
+ return (int) ((long) dbzObj / 1000);
+ case NanoTime.SCHEMA_NAME:
+ return (int) ((long) dbzObj / 1000_000);
+ }
+ } else if (dbzObj instanceof Integer) {
+ return dbzObj;
+ }
+ // get number of milliseconds of the day
+ return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case Timestamp.SCHEMA_NAME:
+ return TimestampData.fromEpochMillis((Long) dbzObj);
+ case MicroTimestamp.SCHEMA_NAME:
+ long micro = (long) dbzObj;
+ return TimestampData.fromEpochMillis(
+ micro / 1000, (int) (micro % 1000 * 1000));
+ case NanoTimestamp.SCHEMA_NAME:
+ long nano = (long) dbzObj;
+ return TimestampData.fromEpochMillis(
+ nano / 1000_000, (int) (nano % 1000_000));
+ }
+ }
+ LocalDateTime localDateTime =
+ TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
+ return TimestampData.fromLocalDateTime(localDateTime);
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
+ ZoneId serverTimeZone) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof String) {
+ String str = (String) dbzObj;
+ // TIMESTAMP_LTZ type is encoded in string type
+ Instant instant = Instant.parse(str);
+ return TimestampData.fromLocalDateTime(
+ LocalDateTime.ofInstant(instant, serverTimeZone));
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to TimestampData from unexpected value '"
+ + dbzObj
+ + "' of type "
+ + dbzObj.getClass().getName());
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToString() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return StringData.fromString(dbzObj.toString());
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToBinary() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof byte[]) {
+ return dbzObj;
+ } else if (dbzObj instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ BigDecimal bigDecimal;
+ if (dbzObj instanceof byte[]) {
+ // decimal.handling.mode=precise
+ bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+ } else if (dbzObj instanceof String) {
+ // decimal.handling.mode=string
+ bigDecimal = new BigDecimal((String) dbzObj);
+ } else if (dbzObj instanceof Double) {
+ // decimal.handling.mode=double
+ bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+ } else {
+ if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+ SpecialValueDecimal decimal =
+ VariableScaleDecimal.toLogical((Struct) dbzObj);
+ bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
+ } else {
+ // fallback to string
+ bigDecimal = new BigDecimal(dbzObj.toString());
+ }
+ }
+ return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter createRowConverter(
+ RowType rowType,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+ final DeserializationRuntimeConverter[] fieldConverters =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(
+ logicType ->
+ createConverter(
+ logicType,
+ serverTimeZone,
+ userDefinedConverterFactory))
+ .toArray(DeserializationRuntimeConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ Struct struct = (Struct) dbzObj;
+ int arity = fieldNames.length;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ Field field = schema.field(fieldName);
+ if (field == null) {
+ row.setField(i, null);
+ } else {
+ Object fieldValue = struct.getWithoutDefault(fieldName);
+ Schema fieldSchema = schema.field(fieldName).schema();
+ Object convertedField =
+ convertField(fieldConverters[i], fieldValue, fieldSchema);
+ row.setField(i, convertedField);
+ }
+ }
+ return row;
+ }
+ };
+ }
+
+ private static Object convertField(
+ DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
+ throws Exception {
+ if (fieldValue == null) {
+ return null;
+ } else {
+ return fieldConverter.convert(fieldValue, fieldSchema);
+ }
+ }
+
+ private static DeserializationRuntimeConverter wrapIntoNullableConverter(
+ DeserializationRuntimeConverter converter) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws Exception {
+ if (dbzObj == null) {
+ return null;
+ }
+ return converter.convert(dbzObj, schema);
+ }
+ };
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
new file mode 100644
index 000000000..b00d12024
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/DatabaseHistoryUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
+
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.inlong.sort.singletenant.flink.cdc.debezium.internal.SchemaRecord;
+import io.debezium.relational.history.DatabaseHistory;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Util to safely visit schema history between {@link DatabaseHistory} and {@link
+ * DebeziumSourceFunction}.
+ */
+public class DatabaseHistoryUtil {
+
+ private DatabaseHistoryUtil() {
+ // do nothing
+ }
+
+ /**
+ * Structure to maintain the current schema history. The content in {@link SchemaRecord} is up
+ * to the implementation of the {@link DatabaseHistory}.
+ */
+ private static final Map<String, Collection<SchemaRecord>> HISTORY = new HashMap<>();
+
+ /**
+ * The schema history will be clean up once {@link DatabaseHistory#stop()}, the checkpoint
+ * should fail when this happens.
+ */
+ private static final Map<String, Boolean> HISTORY_CLEANUP_STATUS = new HashMap<>();
+
+ /** Registers history of schema safely. */
+ public static void registerHistory(String engineName, Collection<SchemaRecord> engineHistory) {
+ synchronized (HISTORY) {
+ HISTORY.put(engineName, engineHistory);
+ HISTORY_CLEANUP_STATUS.put(engineName, false);
+ }
+ }
+
+ /** Remove history of schema safely. */
+ public static void removeHistory(String engineName) {
+ synchronized (HISTORY) {
+ HISTORY_CLEANUP_STATUS.put(engineName, true);
+ HISTORY.remove(engineName);
+ }
+ }
+
+ /**
+ * Retrieves history of schema safely, this method firstly checks the history status of specific
+ * engine, and then return the history of schema if the history exists(didn't clean up yet).
+ * Returns null when the history of schema has been clean up.
+ */
+ public static Collection<SchemaRecord> retrieveHistory(String engineName) {
+ synchronized (HISTORY) {
+ if (Boolean.TRUE.equals(HISTORY_CLEANUP_STATUS.get(engineName))) {
+ throw new IllegalStateException(
+ String.format(
+ "Retrieve schema history failed, the schema records for engine %s has been removed,"
+ + " this might because the debezium engine "
+ + "has been shutdown due to other errors.",
+ engineName));
+ } else {
+ return HISTORY.getOrDefault(engineName, Collections.emptyList());
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/TemporalConversions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/TemporalConversions.java
new file mode 100644
index 000000000..24f55b259
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/utils/TemporalConversions.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.concurrent.TimeUnit;
+
+/** Temporal conversion constants. */
+public final class TemporalConversions {
+
+ static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
+ static final long MICROSECONDS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
+ static final long MICROSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toMicros(1);
+ static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
+ static final long NANOSECONDS_PER_MICROSECOND = TimeUnit.MICROSECONDS.toNanos(1);
+ static final long NANOSECONDS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+ static final long NANOSECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1);
+ static final long SECONDS_PER_DAY = TimeUnit.DAYS.toSeconds(1);
+ static final long MICROSECONDS_PER_DAY = TimeUnit.DAYS.toMicros(1);
+ static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
+
+ private TemporalConversions() {
+ }
+
+ public static LocalDate toLocalDate(Object obj) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof LocalDate) {
+ return (LocalDate) obj;
+ }
+ if (obj instanceof LocalDateTime) {
+ return ((LocalDateTime) obj).toLocalDate();
+ }
+ if (obj instanceof java.sql.Date) {
+ return ((java.sql.Date) obj).toLocalDate();
+ }
+ if (obj instanceof java.sql.Time) {
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDate from a java.sql.Time value '" + obj + "'");
+ }
+ if (obj instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date) obj;
+ return LocalDate.of(date.getYear() + 1900, date.getMonth() + 1, date.getDate());
+ }
+ if (obj instanceof Long) {
+ // Assume the value is the epoch day number
+ return LocalDate.ofEpochDay((Long) obj);
+ }
+ if (obj instanceof Integer) {
+ // Assume the value is the epoch day number
+ return LocalDate.ofEpochDay((Integer) obj);
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDate from unexpected value '"
+ + obj
+ + "' of type "
+ + obj.getClass().getName());
+ }
+
+ public static LocalTime toLocalTime(Object obj) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof LocalTime) {
+ return (LocalTime) obj;
+ }
+ if (obj instanceof LocalDateTime) {
+ return ((LocalDateTime) obj).toLocalTime();
+ }
+ if (obj instanceof java.sql.Date) {
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDate from a java.sql.Date value '" + obj + "'");
+ }
+ if (obj instanceof java.sql.Time) {
+ java.sql.Time time = (java.sql.Time) obj;
+ long millis = (int) (time.getTime() % MILLISECONDS_PER_SECOND);
+ int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+ return LocalTime.of(
+ time.getHours(), time.getMinutes(), time.getSeconds(), nanosOfSecond);
+ }
+ if (obj instanceof java.sql.Timestamp) {
+ java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+ return LocalTime.of(
+ timestamp.getHours(),
+ timestamp.getMinutes(),
+ timestamp.getSeconds(),
+ timestamp.getNanos());
+ }
+ if (obj instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date) obj;
+ long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+ int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+ return LocalTime.of(
+ date.getHours(), date.getMinutes(), date.getSeconds(), nanosOfSecond);
+ }
+ if (obj instanceof Duration) {
+ Long value = ((Duration) obj).toNanos();
+ if (value >= 0 && value <= NANOSECONDS_PER_DAY) {
+ return LocalTime.ofNanoOfDay(value);
+ } else {
+ throw new IllegalArgumentException(
+ "Time values must use number of milliseconds greater than 0 and less than 86400000000000");
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalTime from unexpected value '"
+ + obj
+ + "' of type "
+ + obj.getClass().getName());
+ }
+
+ public static LocalDateTime toLocalDateTime(Object obj, ZoneId serverTimeZone) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof OffsetDateTime) {
+ return ((OffsetDateTime) obj).toLocalDateTime();
+ }
+ if (obj instanceof Instant) {
+ return ((Instant) obj).atOffset(ZoneOffset.UTC).toLocalDateTime();
+ }
+ if (obj instanceof LocalDateTime) {
+ return (LocalDateTime) obj;
+ }
+ if (obj instanceof LocalDate) {
+ LocalDate date = (LocalDate) obj;
+ return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+ }
+ if (obj instanceof LocalTime) {
+ LocalTime time = (LocalTime) obj;
+ return LocalDateTime.of(EPOCH, time);
+ }
+ if (obj instanceof java.sql.Date) {
+ java.sql.Date sqlDate = (java.sql.Date) obj;
+ LocalDate date = sqlDate.toLocalDate();
+ return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+ }
+ if (obj instanceof java.sql.Time) {
+ LocalTime localTime = toLocalTime(obj);
+ return LocalDateTime.of(EPOCH, localTime);
+ }
+ if (obj instanceof java.sql.Timestamp) {
+ java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+ return LocalDateTime.of(
+ timestamp.getYear() + 1900,
+ timestamp.getMonth() + 1,
+ timestamp.getDate(),
+ timestamp.getHours(),
+ timestamp.getMinutes(),
+ timestamp.getSeconds(),
+ timestamp.getNanos());
+ }
+ if (obj instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date) obj;
+ long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+ if (millis < 0) {
+ millis = MILLISECONDS_PER_SECOND + millis;
+ }
+ int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+ return LocalDateTime.of(
+ date.getYear() + 1900,
+ date.getMonth() + 1,
+ date.getDate(),
+ date.getHours(),
+ date.getMinutes(),
+ date.getSeconds(),
+ nanosOfSecond);
+ }
+ if (obj instanceof String) {
+ String str = (String) obj;
+ // TIMESTAMP type is encoded in string type
+ Instant instant = Instant.parse(str);
+ return LocalDateTime.ofInstant(instant, serverTimeZone);
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDateTime from unexpected value '"
+ + obj
+ + "' of type "
+ + obj.getClass().getName());
+ }
+
+ public static long toEpochMicros(Instant instant) {
+ return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+ + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
+ }
+
+ public static Instant toInstantFromMicros(long microsSinceEpoch) {
+ return Instant.ofEpochSecond(
+ TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
+ TimeUnit.MICROSECONDS.toNanos(microsSinceEpoch % TimeUnit.SECONDS.toMicros(1)));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9280ea395..dc8af3bae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,9 @@
<opencsv.version>5.4</opencsv.version>
<javax.servlet.api.version>4.0.1</javax.servlet.api.version>
+ <flink-table-planner-blink_2.11.version>1.13.5</flink-table-planner-blink_2.11.version>
+ <flink-connector-mysql-cdc.version>2.0.2</flink-connector-mysql-cdc.version>
+
<gson.version>2.8.6</gson.version>
<jackson.version>2.13.2</jackson.version>
<jackson.databind.version>2.13.2.2</jackson.databind.version>
@@ -224,6 +227,8 @@
<jcommander.version>1.78</jcommander.version>
<je.version>7.3.7</je.version>
<tencentcloud.cls.version>1.0.5</tencentcloud.cls.version>
+ <esri-geometry-api.version>2.0.0</esri-geometry-api.version>
+ <HikariCP.version>4.0.3</HikariCP.version>
</properties>
<dependencyManagement>
@@ -239,6 +244,16 @@
<artifactId>flume-ng-node</artifactId>
<version>${flume.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-mysql-cdc</artifactId>
+ <version>${flink-connector-mysql-cdc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_2.11</artifactId>
+ <version>${flink-table-planner-blink_2.11.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
@@ -249,6 +264,17 @@
<artifactId>flume-ng-configuration</artifactId>
<version>${flume.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ <version>${HikariCP.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hdfs-sink</artifactId>
@@ -1274,7 +1300,11 @@
<artifactId>jcommander</artifactId>
<version>${jcommander.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>com.esri.geometry</groupId>
+ <artifactId>esri-geometry-api</artifactId>
+ <version>${esri-geometry-api.version}</version>
+ </dependency>
<dependency>
<groupId>com.tencentcloudapi.cls</groupId>
<artifactId>tencentcloud-cls-sdk-java</artifactId>