You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/15 02:46:42 UTC
[inlong] branch master updated: [INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group (#5329)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4885bfd37 [INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group (#5329)
4885bfd37 is described below
commit 4885bfd373da02894b6a68d6132ed5705214ed8d
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Mon Aug 15 10:46:37 2022 +0800
[INLONG-5156][Sort] Add metric for SQLServer source with Flink metrics group (#5329)
---
inlong-sort/sort-connectors/sqlserver-cdc/pom.xml | 11 +
.../inlong/sort/cdc/sqlserver/SqlServerSource.java | 174 ++++++
.../sqlserver/table/DebeziumSourceFunction.java | 658 +++++++++++++++++++++
.../cdc/sqlserver/table/SqlServerTableFactory.java | 196 ++++++
.../cdc/sqlserver/table/SqlServerTableSource.java | 257 ++++++++
.../org.apache.flink.table.factories.Factory | 16 +
licenses/inlong-sort-connectors/LICENSE | 7 +
7 files changed, 1319 insertions(+)
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml b/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml
index 1127b9f45..4073341e8 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/pom.xml
@@ -36,6 +36,16 @@
<groupId>com.ververica</groupId>
<artifactId>flink-connector-sqlserver-cdc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -53,6 +63,7 @@
<configuration>
<artifactSet>
<includes>
+ <include>org.apache.inlong:*</include>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
<include>io.debezium:debezium-core</include>
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
new file mode 100644
index 000000000..3dbdd769b
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver;
+
+import com.ververica.cdc.connectors.sqlserver.SqlServerValidator;
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.inlong.sort.cdc.sqlserver.table.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.connector.sqlserver.SqlServerConnector;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to consume transaction
+ * log for SqlServer.
+ */
+public class SqlServerSource {
+
+ private static final String DATABASE_SERVER_NAME = "sqlserver_transaction_log_source";
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ /** Builder class of {@link SqlServerSource}. */
+ public static class Builder<T> {
+
+ private int port = 1433; // default 1433 port
+ private String hostname;
+ private String database;
+ private String username;
+ private String password;
+ private String[] tableList;
+ private Properties dbzProperties;
+ private StartupOptions startupOptions = StartupOptions.initial();
+ private DebeziumDeserializationSchema<T> deserializer;
+ private String inlongMetric;
+ private String auditHostAndPorts;
+
+ public Builder<T> hostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ /** Integer port number of the SQL Server database server. */
+ public Builder<T> port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /** The name of the SQL Server database from which to stream the changes. */
+ public Builder<T> database(String database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * An optional comma-separated list of regular expressions that match fully-qualified table
+ * identifiers for tables that you want Debezium to capture; any table that is not included
+ * in table.include.list is excluded from capture. Each identifier is of the form
+ * schemaName.tableName. By default, the connector captures all non-system tables for the
+ * designated schemas. Must not be used with table.exclude.list.
+ */
+ public Builder<T> tableList(String... tableList) {
+ this.tableList = tableList;
+ return this;
+ }
+
+ /** Username to use when connecting to the SQL Server database server. */
+ public Builder<T> username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /** Password to use when connecting to the SQL Server database server. */
+ public Builder<T> password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /** The Debezium SqlServer connector properties. For example, "snapshot.mode". */
+ public Builder<T> debeziumProperties(Properties properties) {
+ this.dbzProperties = properties;
+ return this;
+ }
+
+ /**
+ * The deserializer used to convert from consumed {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ */
+ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ public Builder<T> inlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
+ public Builder<T> auditHostAndPorts(String auditHostAndPorts) {
+ this.auditHostAndPorts = auditHostAndPorts;
+ return this;
+ }
+
+ /** Specifies the startup options. */
+ public Builder<T> startupOptions(StartupOptions startupOptions) {
+ this.startupOptions = startupOptions;
+ return this;
+ }
+
+ public DebeziumSourceFunction<T> build() {
+ Properties props = new Properties();
+ props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
+ // hard code server name, because we don't need to distinguish it, docs:
+ // Logical name that identifies and provides a namespace for the SQL Server database
+ // server that you want Debezium to capture. The logical name should be unique across
+ // all other connectors, since it is used as a prefix for all Kafka topic names
+ // emanating from this connector. Only alphanumeric characters and underscores should be
+ // used.
+ props.setProperty("database.server.name", DATABASE_SERVER_NAME);
+ props.setProperty("database.hostname", checkNotNull(hostname));
+ props.setProperty("database.user", checkNotNull(username));
+ props.setProperty("database.password", checkNotNull(password));
+ props.setProperty("database.port", String.valueOf(port));
+ props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
+ props.setProperty("database.dbname", checkNotNull(database));
+
+ if (tableList != null) {
+ props.setProperty("table.include.list", String.join(",", tableList));
+ }
+
+ switch (startupOptions.startupMode) {
+ case INITIAL:
+ props.setProperty("snapshot.mode", "initial");
+ break;
+ case INITIAL_ONLY:
+ props.setProperty("snapshot.mode", "initial_only");
+ break;
+ case LATEST_OFFSET:
+ props.setProperty("snapshot.mode", "schema_only");
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ if (dbzProperties != null) {
+ props.putAll(dbzProperties);
+ }
+
+ return new DebeziumSourceFunction<>(
+ deserializer, props, null, new SqlServerValidator(props),
+ inlongMetric, auditHostAndPorts);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
new file mode 100644
index 000000000..c1cea9dba
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
+ * from databases into Flink.
+ *
+ * <p>Please refer to Debezium's documentation for the available configuration properties:
+ * https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+ implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = -5808108641062931623L;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+ /**
+ * State name of the consumer's partition offset states.
+ */
+ public static final String OFFSETS_STATE_NAME = "offset-states";
+
+ /**
+ * State name of the consumer's history records state.
+ */
+ public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
+
+ /**
+ * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+ */
+ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+ /**
+ * The configuration represents the Debezium MySQL Connector uses the legacy implementation or
+ * not.
+ */
+ public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation";
+
+ /**
+ * The configuration value represents legacy implementation.
+ */
+ public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+ // ---------------------------------------------------------------------------------------
+ // Properties
+ // ---------------------------------------------------------------------------------------
+
+ /**
+ * The schema to convert from Debezium's messages into Flink's objects.
+ */
+ private final DebeziumDeserializationSchema<T> deserializer;
+
+ /**
+ * User-supplied properties for Kafka. *
+ */
+ private final Properties properties;
+
+ /**
+ * The specific binlog offset to read from when the first startup.
+ */
+ private final @Nullable
+ DebeziumOffset specificOffset;
+
+ /**
+ * Data for pending but uncommitted offsets.
+ */
+ private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+ /**
+ * Flag indicating whether the Debezium Engine is started.
+ */
+ private volatile boolean debeziumStarted = false;
+
+ /**
+ * Validator to validate the connected database satisfies the cdc connector's requirements.
+ */
+ private final Validator validator;
+
+ // ---------------------------------------------------------------------------------------
+ // State
+ // ---------------------------------------------------------------------------------------
+
+ /**
+ * The offsets to restore to, if the consumer restores state from a checkpoint.
+ *
+ * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+ * method.
+ *
+ * <p>Using a String because we are encoding the offset state in JSON bytes.
+ */
+ private transient volatile String restoredOffsetState;
+
+ /**
+ * Accessor for state in the operator state backend.
+ */
+ private transient ListState<byte[]> offsetState;
+
+ /**
+ * State to store the history records, i.e. schema changes.
+ *
+ * @see FlinkDatabaseHistory
+ * @see FlinkDatabaseSchemaHistory
+ */
+ private transient ListState<String> schemaRecordsState;
+
+ // ---------------------------------------------------------------------------------------
+ // Worker
+ // ---------------------------------------------------------------------------------------
+
+ private transient ExecutorService executor;
+ private transient DebeziumEngine<?> engine;
+ /**
+ * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
+ * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+ */
+ private transient String engineInstanceName;
+
+ /**
+ * Consume the events from the engine and commit the offset to the engine.
+ */
+ private transient DebeziumChangeConsumer changeConsumer;
+
+ /**
+ * The consumer to fetch records from {@link Handover}.
+ */
+ private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+ /**
+ * Buffer the events from the source and record the errors from the debezium.
+ */
+ private transient Handover handover;
+
+ private String inlongMetric;
+
+ private SourceMetricData metricData;
+
+ private String inLongGroupId;
+
+ private String auditHostAndPorts;
+
+ private String inLongStreamId;
+
+ private transient AuditImp auditImp;
+
+ // ---------------------------------------------------------------------------------------
+
+ public DebeziumSourceFunction(
+ DebeziumDeserializationSchema<T> deserializer,
+ Properties properties,
+ @Nullable DebeziumOffset specificOffset,
+ Validator validator, String inlongMetric,
+ String auditHostAndPorts) {
+ this.deserializer = deserializer;
+ this.properties = properties;
+ this.specificOffset = specificOffset;
+ this.validator = validator;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ validator.validate();
+ super.open(parameters);
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+ this.executor = Executors.newSingleThreadExecutor(threadFactory);
+ this.handover = new Handover();
+ this.changeConsumer = new DebeziumChangeConsumer(handover);
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpoint and restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ this.offsetState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ OFFSETS_STATE_NAME,
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+ this.schemaRecordsState =
+ stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
+
+ if (context.isRestored()) {
+ restoreOffsetState();
+ restoreHistoryRecordsState();
+ } else {
+ if (specificOffset != null) {
+ byte[] serializedOffset =
+ DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+ restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+ LOG.info(
+ "Consumer subtask {} starts to read from specified offset {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredOffsetState);
+ } else {
+ LOG.info(
+ "Consumer subtask {} has no restore state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+ }
+
+ private void restoreOffsetState() throws Exception {
+ for (byte[] serializedOffset : offsetState.get()) {
+ if (restoredOffsetState == null) {
+ restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
+ } else {
+ throw new RuntimeException(
+ "Debezium Source only support single task, "
+ + "however, this is restored from multiple tasks.");
+ }
+ }
+ LOG.info(
+ "Consumer subtask {} restored offset state: {}.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredOffsetState);
+ }
+
+ private void restoreHistoryRecordsState() throws Exception {
+ DocumentReader reader = DocumentReader.defaultReader();
+ ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>();
+ int recordsCount = 0;
+ boolean firstEntry = true;
+ for (String record : schemaRecordsState.get()) {
+ if (firstEntry) {
+ // we store the engine instance name in the first element
+ this.engineInstanceName = record;
+ firstEntry = false;
+ } else {
+ // Put the records into the state. The database history should read, reorganize and
+ // register the state.
+ historyRecords.add(new SchemaRecord(reader.read(record)));
+ recordsCount++;
+ }
+ }
+ if (engineInstanceName != null) {
+ registerHistory(engineInstanceName, historyRecords);
+ }
+ LOG.info(
+ "Consumer subtask {} restored history records state: {} with {} records.",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ engineInstanceName,
+ recordsCount);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ if (handover.hasError()) {
+ LOG.debug("snapshotState() called on closed source");
+ throw new FlinkRuntimeException(
+ "Call snapshotState() on closed source, checkpoint failed.");
+ } else {
+ snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+ snapshotHistoryRecordsState();
+ }
+ }
+
+ private void snapshotOffsetState(long checkpointId) throws Exception {
+ offsetState.clear();
+
+ final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+ byte[] serializedOffset = null;
+ if (fetcher == null) {
+ // the fetcher has not yet been initialized, which means we need to return the
+ // originally restored offsets
+ if (restoredOffsetState != null) {
+ serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+ }
+ } else {
+ byte[] currentState = fetcher.snapshotCurrentState();
+ if (currentState == null && restoredOffsetState != null) {
+ // the fetcher has been initialized, but has not yet received any data,
+ // which means we need to return the originally restored offsets.
+ serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+ } else {
+ serializedOffset = currentState;
+ }
+ }
+
+ if (serializedOffset != null) {
+ offsetState.add(serializedOffset);
+ // the map cannot be asynchronously updated, because only one checkpoint call
+ // can happen on this function at a time: either snapshotState() or
+ // notifyCheckpointComplete()
+ pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+ // truncate the map of pending offsets to commit, to prevent infinite growth
+ while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+ pendingOffsetsToCommit.remove(0);
+ }
+ }
+ }
+
+ private void snapshotHistoryRecordsState() throws Exception {
+ schemaRecordsState.clear();
+
+ if (engineInstanceName != null) {
+ schemaRecordsState.add(engineInstanceName);
+ Collection<SchemaRecord> records = retrieveHistory(engineInstanceName);
+ DocumentWriter writer = DocumentWriter.defaultWriter();
+ for (SchemaRecord record : records) {
+ schemaRecordsState.add(writer.write(record.toDocument()));
+ }
+ }
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+
+ // initialize metrics
+ // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
+ final Method getMetricGroupMethod =
+ getRuntimeContext().getClass().getMethod("getMetricGroup");
+ getMetricGroupMethod.setAccessible(true);
+ final MetricGroup metricGroup =
+ (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+ metricGroup.gauge(
+ "currentFetchEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+ metricGroup.gauge(
+ "currentEmitEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+ metricGroup.gauge(
+ "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
+ if (StringUtils.isNotEmpty(this.inlongMetric)) {
+ String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+ inLongGroupId = inlongMetricArray[0];
+ inLongStreamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, metricGroup);
+ metricData.registerMetricsForNumRecordsIn();
+ metricData.registerMetricsForNumBytesIn();
+ metricData.registerMetricsForNumBytesInPerSecond();
+ metricData.registerMetricsForNumRecordsInPerSecond();
+ }
+ if (auditHostAndPorts != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
+ }
+ properties.setProperty("name", "engine");
+ properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
+ if (restoredOffsetState != null) {
+ // restored from state
+ properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
+ }
+ // DO NOT include schema change, e.g. DDL
+ properties.setProperty("include.schema.changes", "false");
+ // disable the offset flush totally
+ properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
+ // disable tombstones
+ properties.setProperty("tombstones.on.delete", "false");
+ if (engineInstanceName == null) {
+ // not restore from recovery
+ engineInstanceName = UUID.randomUUID().toString();
+ }
+ // history instance name to initialize FlinkDatabaseHistory
+ properties.setProperty(
+ FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
+ // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
+ // continue to read binlog
+ // see
+ // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+ // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+ properties.setProperty("database.history", determineDatabase().getCanonicalName());
+
+ // we have to filter out the heartbeat events, otherwise the deserializer will fail
+ String dbzHeartbeatPrefix =
+ properties.getProperty(
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+ Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+ this.debeziumChangeFetcher =
+ new DebeziumChangeFetcher<>(
+ sourceContext,
+ new DebeziumDeserializationSchema<T>() {
+ @Override
+ public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
+ outputMetrics(record);
+ deserializer.deserialize(record, out);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+ },
+ restoredOffsetState == null, // DB snapshot phase if restore state is null
+ dbzHeartbeatPrefix,
+ handover);
+
+ // create the engine with this configuration ...
+ this.engine =
+ DebeziumEngine.create(Connect.class)
+ .using(properties)
+ .notifying(changeConsumer)
+ .using(OffsetCommitPolicy.always())
+ .using(
+ (success, message, error) -> {
+ if (success) {
+ // Close the handover and prepare to exit.
+ handover.close();
+ } else {
+ handover.reportError(error);
+ }
+ })
+ .build();
+
+ // run the engine asynchronously
+ executor.execute(engine);
+ debeziumStarted = true;
+
+ // start the real debezium consumer
+ debeziumChangeFetcher.runFetchLoop();
+ }
+
+ private void outputMetrics(SourceRecord record) {
+ if (metricData != null) {
+ metricData.getNumRecordsIn().inc(1L);
+ metricData.getNumBytesIn()
+ .inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ if (auditImp != null) {
+ auditImp.add(
+ Constants.AUDIT_SORT_INPUT,
+ inLongGroupId,
+ inLongStreamId,
+ System.currentTimeMillis(),
+ 1,
+ record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ if (!debeziumStarted) {
+ LOG.debug("notifyCheckpointComplete() called when engine is not started.");
+ return;
+ }
+
+ final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+ if (fetcher == null) {
+ LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+ return;
+ }
+
+ try {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn(
+ "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+ getRuntimeContext().getIndexOfThisSubtask(),
+ checkpointId);
+ return;
+ }
+
+ byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (serializedOffsets == null || serializedOffsets.length == 0) {
+ LOG.debug(
+ "Consumer subtask {} has empty checkpoint state.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ return;
+ }
+
+ DebeziumOffset offset =
+ DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+ changeConsumer.commitOffset(offset);
+ } catch (Exception e) {
+ // ignore exception if we are no longer running
+ LOG.warn("Ignore error when committing offset to database.", e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // safely and gracefully stop the engine
+ shutdownEngine();
+ if (debeziumChangeFetcher != null) {
+ debeziumChangeFetcher.close();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancel();
+
+ if (executor != null) {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
+
+ super.close();
+ }
+
+ /**
+ * Safely and gracefully stop the Debezium engine.
+ */
+ private void shutdownEngine() {
+ try {
+ if (engine != null) {
+ engine.close();
+ }
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+
+ debeziumStarted = false;
+
+ if (handover != null) {
+ handover.close();
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+
+ @VisibleForTesting
+ public LinkedMap getPendingOffsetsToCommit() {
+ return pendingOffsetsToCommit;
+ }
+
+ @VisibleForTesting
+ public boolean getDebeziumStarted() {
+ return debeziumStarted;
+ }
+
+ private Class<?> determineDatabase() {
+ boolean isCompatibleWithLegacy =
+ FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+ if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) {
+ // specifies the legacy implementation but the state may be incompatible
+ if (isCompatibleWithLegacy) {
+ return FlinkDatabaseHistory.class;
+ } else {
+ throw new IllegalStateException(
+ "The configured option 'debezium.internal.implementation' is 'legacy', but the state of "
+ + "source is incompatible with this implementation, you should remove the the option.");
+ }
+ } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+ // tries the non-legacy first
+ return FlinkDatabaseSchemaHistory.class;
+ } else if (isCompatibleWithLegacy) {
+ // fallback to legacy if possible
+ return FlinkDatabaseHistory.class;
+ } else {
+ // impossible
+ throw new IllegalStateException("Can't determine which DatabaseHistory to use.");
+ }
+ }
+
+ @VisibleForTesting
+ public String getEngineInstanceName() {
+ return engineInstanceName;
+ }
+
+ public SourceMetricData getMetricData() {
+ return metricData;
+ }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
new file mode 100644
index 000000000..e6a7e5be6
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/** Factory for creating configured instance of {@link SqlServerTableSource}. */
+public class SqlServerTableFactory implements DynamicTableSourceFactory {
+
+ private static final String IDENTIFIER = "sqlserver-cdc-inlong";
+
+ private static final ConfigOption<String> HOSTNAME =
+ ConfigOptions.key("hostname")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("IP address or hostname of the SqlServer database server.");
+
+ private static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port")
+ .intType()
+ .defaultValue(1433)
+ .withDescription("Integer port number of the SqlServer database server.");
+
+ private static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the SqlServer database to use when connecting to the SqlServer database server.");
+
+ private static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Password to use when connecting to the SqlServer database server.");
+
+ private static final ConfigOption<String> DATABASE_NAME =
+ ConfigOptions.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the SqlServer server to monitor.");
+
+ private static final ConfigOption<String> SCHEMA_NAME =
+ ConfigOptions.key("schema-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Schema name of the SqlServer database to monitor.");
+
+ private static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the SqlServer database to monitor.");
+
+ public static final ConfigOption<String> SERVER_TIME_ZONE =
+ ConfigOptions.key("server-time-zone")
+ .stringType()
+ .defaultValue("UTC")
+ .withDescription("The session time zone in database server.");
+
+ public static final ConfigOption<String> SCAN_STARTUP_MODE =
+ ConfigOptions.key("scan.startup.mode")
+ .stringType()
+ .defaultValue("initial")
+ .withDescription(
+ "Optional startup mode for SqlServer CDC consumer, valid enumerations are "
+ + "\"initial\", \"initial-only\", \"latest-offset\"");
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+
+ final ReadableConfig config = helper.getOptions();
+ String hostname = config.get(HOSTNAME);
+ String username = config.get(USERNAME);
+ String password = config.get(PASSWORD);
+ String schemaName = config.get(SCHEMA_NAME);
+ String databaseName = config.get(DATABASE_NAME);
+ String tableName = config.get(TABLE_NAME);
+ String inlongMetric = config.get(INLONG_METRIC);
+ String auditHostAndPorts = config.get(INLONG_AUDIT);
+ ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+ int port = config.get(PORT);
+ StartupOptions startupOptions = getStartupOptions(config);
+ ResolvedSchema physicalSchema =
+ getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
+
+ return new SqlServerTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ databaseName,
+ schemaName,
+ tableName,
+ serverTimeZone,
+ username,
+ password,
+ getDebeziumProperties(context.getCatalogTable().getOptions()),
+ startupOptions,
+ inlongMetric, auditHostAndPorts);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOSTNAME);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(DATABASE_NAME);
+ options.add(SCHEMA_NAME);
+ options.add(TABLE_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PORT);
+ options.add(SERVER_TIME_ZONE);
+ options.add(SCAN_STARTUP_MODE);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
+ return options;
+ }
+
+ private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+ private static final String SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY = "initial-only";
+ private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+
+ private static StartupOptions getStartupOptions(ReadableConfig config) {
+ String modeString = config.get(SCAN_STARTUP_MODE);
+
+ switch (modeString.toLowerCase()) {
+ case SCAN_STARTUP_MODE_VALUE_INITIAL:
+ return StartupOptions.initial();
+
+ case SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY:
+ return StartupOptions.initialOnly();
+
+ case SCAN_STARTUP_MODE_VALUE_LATEST:
+ return StartupOptions.latest();
+
+ default:
+ throw new ValidationException(
+ String.format(
+ "Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s",
+ SCAN_STARTUP_MODE.key(),
+ SCAN_STARTUP_MODE_VALUE_INITIAL,
+ SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY,
+ SCAN_STARTUP_MODE_VALUE_LATEST,
+ modeString));
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
new file mode 100644
index 000000000..71dc6c784
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.sqlserver.table;
+
+import com.ververica.cdc.connectors.sqlserver.table.SqlServerDeserializationConverterFactory;
+import com.ververica.cdc.connectors.sqlserver.table.SqlServerReadableMetadata;
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.cdc.sqlserver.SqlServerSource;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a SqlServer source from a logical
+ * description.
+ */
+public class SqlServerTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final ResolvedSchema physicalSchema;
+ private final int port;
+ private final String hostname;
+ private final String database;
+ private final String schemaName;
+ private final String tableName;
+ private final ZoneId serverTimeZone;
+ private final String username;
+ private final String password;
+ private final Properties dbzProperties;
+ private final StartupOptions startupOptions;
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ private String inlongMetric;
+
+ private String auditHostAndPorts;
+
+ public SqlServerTableSource(
+ ResolvedSchema physicalSchema,
+ int port,
+ String hostname,
+ String database,
+ String schemaName,
+ String tableName,
+ ZoneId serverTimeZone,
+ String username,
+ String password,
+ Properties dbzProperties,
+ StartupOptions startupOptions,
+ String inlongMetric,
+ String auditHostAndPorts) {
+ this.physicalSchema = physicalSchema;
+ this.port = port;
+ this.hostname = checkNotNull(hostname);
+ this.database = checkNotNull(database);
+ this.schemaName = checkNotNull(schemaName);
+ this.tableName = checkNotNull(tableName);
+ this.serverTimeZone = serverTimeZone;
+ this.username = checkNotNull(username);
+ this.password = checkNotNull(password);
+ this.dbzProperties = dbzProperties;
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ this.startupOptions = startupOptions;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ RowType physicalDataType =
+ (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+ MetadataConverter[] metadataConverters = getMetadataConverters();
+ TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
+
+ DebeziumDeserializationSchema<RowData> deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType(physicalDataType)
+ .setMetadataConverters(metadataConverters)
+ .setResultTypeInfo(typeInfo)
+ .setServerTimeZone(serverTimeZone)
+ .setUserDefinedConverterFactory(
+ SqlServerDeserializationConverterFactory.instance())
+ .build();
+ DebeziumSourceFunction<RowData> sourceFunction =
+ SqlServerSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(database)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .debeziumProperties(dbzProperties)
+ .startupOptions(startupOptions)
+ .deserializer(deserializer)
+ .inlongMetric(inlongMetric)
+ .auditHostAndPorts(auditHostAndPorts)
+ .build();
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
+
+ private MetadataConverter[] getMetadataConverters() {
+ if (metadataKeys.isEmpty()) {
+ return new MetadataConverter[0];
+ }
+
+ return metadataKeys.stream()
+ .map(
+ key ->
+ Stream.of(SqlServerReadableMetadata.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(SqlServerReadableMetadata::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ SqlServerTableSource source =
+ new SqlServerTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ schemaName,
+ tableName,
+ serverTimeZone,
+ username,
+ password,
+ dbzProperties,
+ startupOptions,
+ inlongMetric,
+ auditHostAndPorts);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SqlServerTableSource that = (SqlServerTableSource) o;
+ return port == that.port
+ && Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(hostname, that.hostname)
+ && Objects.equals(database, that.database)
+ && Objects.equals(schemaName, that.schemaName)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(serverTimeZone, that.serverTimeZone)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password)
+ && Objects.equals(dbzProperties, that.dbzProperties)
+ && Objects.equals(startupOptions, that.startupOptions)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ schemaName,
+ tableName,
+ serverTimeZone,
+ username,
+ password,
+ dbzProperties,
+ startupOptions,
+ producedDataType,
+ metadataKeys);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "SqlServer-CDC";
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Stream.of(SqlServerReadableMetadata.values())
+ .collect(
+ Collectors.toMap(
+ SqlServerReadableMetadata::getKey,
+ SqlServerReadableMetadata::getDataType));
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+}
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..156311bd2
--- /dev/null
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.cdc.sqlserver.table.SqlServerTableFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index db56dcfee..77f3d5e07 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -510,6 +510,13 @@
Source : org.apache.flink:flink-connector-jdbc_2.11:1.13.5 (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE
+ 1.3.8 inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+ inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
+ inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableSource.java
+ inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/SqlServerSource.java
+ Source : flink-cdc-connectors 2.2.1 (Please note that the software have been modified.)
+ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents: