You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/06/26 02:11:54 UTC

[inlong] branch master updated: [INLONG-8307][Sort] Fix job restart failed from savepoint when set 'scan.startup.mode' = 'timestamp|earliest-offset|specific-offset' (#8308)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cf4b01e3bd [INLONG-8307][Sort] Fix job restart failed from savepoint when set 'scan.startup.mode' = 'timestamp|earliest-offset|specific-offset' (#8308)
cf4b01e3bd is described below

commit cf4b01e3bdb3bdbc61c5043fab3d275c44c882f0
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Mon Jun 26 10:11:49 2023 +0800

    [INLONG-8307][Sort] Fix job restart failed from savepoint when set 'scan.startup.mode' = 'timestamp|earliest-offset|specific-offset' (#8308)
---
 .../sort-connectors/mysql-cdc/pom.xml              |    1 +
 .../io/debezium/connector/mysql/GtidUtils.java     |   94 ++
 .../mysql/MySqlStreamingChangeEventSource.java     | 1262 ++++++++++++++++++++
 .../debezium/task/context/StatefulTaskContext.java |   15 +-
 licenses/inlong-sort-connectors/LICENSE            |    2 +
 5 files changed, 1372 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml
index 92c9a28d08..23a5c165a6 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml
@@ -118,6 +118,7 @@
                                     <artifact>org.apache.inlong:sort-connector-*</artifact>
                                     <includes>
                                         <include>org/apache/inlong/**</include>
+                                        <include>io/debezium/connector/mysql/**</include>
                                         <include>META-INF/services/org.apache.flink.table.factories.Factory</include>
                                     </includes>
                                 </filter>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
new file mode 100644
index 0000000000..52a6d5dace
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import io.debezium.connector.mysql.GtidSet.UUIDSet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for handling GTIDs. */
+public class GtidUtils {
+
+    /**
+     * This method corrects the GTID set that has been restored from a state or checkpoint using the
+     * GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the
+     * restored GTID set is adjusted according to the server's GTID set to ensure it does not exceed
+     * the latter. For each UUID in the restored GTID set, if it exists in the server's GTID set,
+     * then it will be adjusted according to the server's GTID set; if it does not exist in the
+     * server's GTID set, it will be directly added to the new GTID set.
+     */
+    public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
+        Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
+        serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
+        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
+            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
+            if (serverUuidSet != null) {
+                long restoredIntervalEnd = getIntervalEnd(uuidSet);
+                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
+                        new ArrayList<>();
+                for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) {
+                    if (serverInterval.getEnd() <= restoredIntervalEnd) {
+                        newIntervals.add(
+                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
+                                        serverInterval.getStart(), serverInterval.getEnd()));
+                    } else if (serverInterval.getStart() <= restoredIntervalEnd
+                            && serverInterval.getEnd() > restoredIntervalEnd) {
+                        newIntervals.add(
+                                new com.github.shyiko.mysql.binlog.GtidSet.Interval(
+                                        serverInterval.getStart(), restoredIntervalEnd));
+                    }
+                }
+                newSet.put(uuidSet.getUUID(), new UUIDSet(new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
+                        uuidSet.getUUID(), newIntervals)));
+                newSet.put(
+                        uuidSet.getUUID(),
+                        new GtidSet.UUIDSet(
+                                new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
+                                        uuidSet.getUUID(), newIntervals)));
+            } else {
+                newSet.put(uuidSet.getUUID(), uuidSet);
+            }
+        }
+        return new GtidSet(newSet);
+    }
+
+    /**
+     * This method merges one GTID set (toMerge) into another (base), without overwriting the
+     * existing elements in the base GTID set.
+     */
+    public static GtidSet mergeGtidSetInto(GtidSet base, GtidSet toMerge) {
+        Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
+        base.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
+        for (GtidSet.UUIDSet uuidSet : toMerge.getUUIDSets()) {
+            if (!newSet.containsKey(uuidSet.getUUID())) {
+                newSet.put(uuidSet.getUUID(), uuidSet);
+            }
+        }
+        return new GtidSet(newSet);
+    }
+
+    private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) {
+        return uuidSet.getIntervals().stream()
+                .mapToLong(GtidSet.Interval::getEnd)
+                .max()
+                .getAsLong();
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
new file mode 100644
index 0000000000..16da062352
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -0,0 +1,1262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import com.github.shyiko.mysql.binlog.network.AuthenticationException;
+import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.ServerException;
+import io.debezium.DebeziumException;
+import io.debezium.annotation.SingleThreadAccess;
+import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
+import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
+import io.debezium.data.Envelope.Operation;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
+import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import static io.debezium.util.Strings.isNullOrEmpty;
+
+/**
+ * Copied from Debezium project to fix
+ * https://github.com/apache/inlong/issues/8307.
+ *
+ * <p>Line 1134-1139 : Adjust GTID merging logic to support recovering from job which previously
+ * specifying starting offset on start.
+ */
+public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class);
+
+    private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
+
+    private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);
+    private final BinaryLogClient client;
+    private final MySqlStreamingChangeEventSourceMetrics metrics;
+    private final Clock clock;
+    private final EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode;
+    private final EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode;
+
+    private int startingRowNumber = 0;
+    private long initialEventsToSkip = 0L;
+    private boolean skipEvent = false;
+    private boolean ignoreDmlEventByGtidSource = false;
+    private final Predicate<String> gtidDmlSourceFilter;
+    private final AtomicLong totalRecordCounter = new AtomicLong();
+    private volatile Map<String, ?> lastOffset = null;
+    private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
+    private final float heartbeatIntervalFactor = 0.8f;
+    private final Map<String, Thread> binaryLogClientThreads = new ConcurrentHashMap<>(4);
+    private final MySqlTaskContext taskContext;
+    private final MySqlConnectorConfig connectorConfig;
+    private final MySqlConnection connection;
+    private final EventDispatcher<TableId> eventDispatcher;
+    private final MySqlOffsetContext offsetContext;
+    private final ErrorHandler errorHandler;
+    private boolean isRestoredFromCheckpoint = false;
+
+    @SingleThreadAccess("binlog client thread")
+    private Instant eventTimestamp;
+
+    public static class BinlogPosition {
+
+        final String filename;
+        final long position;
+
+        public BinlogPosition(String filename, long position) {
+            assert filename != null;
+
+            this.filename = filename;
+            this.position = position;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public long getPosition() {
+            return position;
+        }
+
+        @Override
+        public String toString() {
+            return filename + "/" + position;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + filename.hashCode();
+            result = prime * result + (int) (position ^ (position >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            BinlogPosition other = (BinlogPosition) obj;
+            if (!filename.equals(other.filename)) {
+                return false;
+            }
+            if (position != other.position) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    @FunctionalInterface
+    private static interface BinlogChangeEmitter<T> {
+
+        void emit(TableId tableId, T data) throws InterruptedException;
+    }
+
+    public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlOffsetContext offsetContext,
+            MySqlConnection connection,
+            EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
+            MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics) {
+
+        this.taskContext = taskContext;
+        this.connectorConfig = connectorConfig;
+        this.connection = connection;
+        this.clock = clock;
+        this.eventDispatcher = dispatcher;
+        this.errorHandler = errorHandler;
+        // With snapshot mode NEVER the initial context is not created by snapshot
+        this.offsetContext = (offsetContext == null) ? MySqlOffsetContext.initial(connectorConfig) : offsetContext;
+        this.metrics = metrics;
+
+        eventDeserializationFailureHandlingMode = connectorConfig.getEventProcessingFailureHandlingMode();
+        inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();
+
+        // Set up the log reader ...
+        client = taskContext.getBinaryLogClient();
+        // BinaryLogClient will overwrite thread names later
+        client.setThreadFactory(
+                Threads.threadFactory(MySqlConnector.class, connectorConfig.getLogicalName(), "binlog-client", false,
+                        false,
+                        x -> binaryLogClientThreads.put(x.getName(), x)));
+        client.setServerId(connectorConfig.serverId());
+        client.setSSLMode(sslModeFor(connectorConfig.sslMode()));
+        if (connectorConfig.sslModeEnabled()) {
+            SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectorConfig, connection);
+            if (sslSocketFactory != null) {
+                client.setSslSocketFactory(sslSocketFactory);
+            }
+        }
+        Configuration configuration = connectorConfig.getConfig();
+        client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
+        final long keepAliveInterval = configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
+        client.setKeepAliveInterval(keepAliveInterval);
+        // Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor
+        // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default value of
+        // heartbeatIntervalFactor
+        // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from
+        // the MySQL server.
+        client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));
+
+        boolean filterDmlEventsByGtidSource =
+                configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
+        gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
+
+        // Set up the event deserializer with additional type(s) ...
+        final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
+        EventDeserializer eventDeserializer = new EventDeserializer() {
+
+            @Override
+            public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
+                try {
+                    // Delegate to the superclass ...
+                    Event event = super.nextEvent(inputStream);
+
+                    // We have to record the most recent TableMapEventData for each table number for our custom
+                    // deserializers ...
+                    if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
+                        TableMapEventData tableMapEvent = event.getData();
+                        tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
+                    }
+                    return event;
+                }
+                // DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
+                catch (EventDataDeserializationException edde) {
+                    // DBZ-3095 As of Java 15, when reaching EOF in the binlog stream, the polling loop in
+                    // BinaryLogClient#listenForEventPackets() keeps returning values != -1 from peek();
+                    // this causes the loop to never finish
+                    // Propagating the exception (either EOF or socket closed) causes the loop to be aborted
+                    // in this case
+                    if (edde.getCause() instanceof IOException) {
+                        throw edde;
+                    }
+
+                    EventHeaderV4 header = new EventHeaderV4();
+                    header.setEventType(EventType.INCIDENT);
+                    header.setTimestamp(edde.getEventHeader().getTimestamp());
+                    header.setServerId(edde.getEventHeader().getServerId());
+
+                    if (edde.getEventHeader() instanceof EventHeaderV4) {
+                        header.setEventLength(((EventHeaderV4) edde.getEventHeader()).getEventLength());
+                        header.setNextPosition(((EventHeaderV4) edde.getEventHeader()).getNextPosition());
+                        header.setFlags(((EventHeaderV4) edde.getEventHeader()).getFlags());
+                    }
+
+                    EventData data = new EventDataDeserializationExceptionData(edde);
+                    return new Event(header, data);
+                }
+            }
+        };
+
+        // Add our custom deserializers ...
+        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
+        eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
+        eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
+                new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
+                new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
+                new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
+                new RowDeserializers.WriteRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
+                new RowDeserializers.UpdateRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
+                new RowDeserializers.DeleteRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        client.setEventDeserializer(eventDeserializer);
+    }
+
+    protected void onEvent(Event event) {
+        long ts = 0;
+
+        if (event.getHeader().getEventType() == EventType.HEARTBEAT) {
+            // HEARTBEAT events have no timestamp but are fired only when
+            // there is no traffic on the connection which means we are caught-up
+            // https://dev.mysql.com/doc/internals/en/heartbeat-event.html
+            metrics.setMilliSecondsBehindSource(ts);
+            return;
+        }
+
+        // MySQL has seconds resolution but mysql-binlog-connector-java returns
+        // a value in milliseconds
+        long eventTs = event.getHeader().getTimestamp();
+
+        if (eventTs == 0) {
+            LOGGER.trace("Received unexpected event with 0 timestamp: {}", event);
+            return;
+        }
+
+        ts = clock.currentTimeInMillis() - eventTs;
+        LOGGER.trace("Current milliseconds behind source: {} ms", ts);
+        metrics.setMilliSecondsBehindSource(ts);
+    }
+
+    protected void ignoreEvent(Event event) {
+        LOGGER.trace("Ignoring event due to missing handler: {}", event);
+    }
+
+    protected void handleEvent(Event event) {
+        if (event == null) {
+            return;
+        }
+
+        final EventHeader eventHeader = event.getHeader();
+        // Update the source offset info. Note that the client returns the value in *milliseconds*, even though the
+        // binlog
+        // contains only *seconds* precision ...
+        // HEARTBEAT events have no timestamp; only set the timestamp if the event is not a HEARTBEAT
+        eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT)
+                ? Instant.ofEpochMilli(eventHeader.getTimestamp())
+                : null;
+        offsetContext.setBinlogThread(eventHeader.getServerId());
+
+        final EventType eventType = eventHeader.getEventType();
+        if (eventType == EventType.ROTATE) {
+            EventData eventData = event.getData();
+            RotateEventData rotateEventData;
+            if (eventData instanceof EventDeserializer.EventDataWrapper) {
+                rotateEventData = (RotateEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+            } else {
+                rotateEventData = (RotateEventData) eventData;
+            }
+            offsetContext.setBinlogStartPoint(rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition());
+        } else if (eventHeader instanceof EventHeaderV4) {
+            EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
+            offsetContext.setEventPosition(trackableEventHeader.getPosition(), trackableEventHeader.getEventLength());
+        }
+
+        // If there is a handler for this event, forward the event to it ...
+        try {
+            // Forward the event to the handler ...
+            eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event);
+
+            // Generate heartbeat message if the time is right
+            eventDispatcher.dispatchHeartbeatEvent(offsetContext);
+
+            // Capture that we've completed another event ...
+            offsetContext.completeEvent();
+
+            if (skipEvent) {
+                // We're in the mode of skipping events and we just skipped this one, so decrement our skip count ...
+                --initialEventsToSkip;
+                skipEvent = initialEventsToSkip > 0;
+            }
+        } catch (RuntimeException e) {
+            // There was an error in the event handler, so propagate the failure to Kafka Connect ...
+            logStreamingSourceState();
+            errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog event", e));
+            // Do not stop the client, since Kafka Connect should stop the connector on it's own
+            // (and doing it here may cause problems the second time it is stopped).
+            // We can clear the listeners though so that we ignore all future events ...
+            eventHandlers.clear();
+            LOGGER.info(
+                    "Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.");
+        } catch (InterruptedException e) {
+            // Most likely because this reader was stopped and our thread was interrupted ...
+            Thread.currentThread().interrupt();
+            eventHandlers.clear();
+            LOGGER.info("Stopped processing binlog events due to thread interruption");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends EventData> T unwrapData(Event event) {
+        EventData eventData = event.getData();
+        if (eventData instanceof EventDeserializer.EventDataWrapper) {
+            eventData = ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+        }
+        return (T) eventData;
+    }
+
+    /**
+     * Handle the supplied event that signals that mysqld has stopped.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerStop(Event event) {
+        LOGGER.debug("Server stopped: {}", event);
+    }
+
+    /**
+     * Handle the supplied event that is sent by a primary to a replica to let the replica know that the primary is still alive. Not
+     * written to a binary log.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerHeartbeat(Event event) {
+        LOGGER.trace("Server heartbeat: {}", event);
+    }
+
+    /**
+     * Handle the supplied event that signals that an out of the ordinary event that occurred on the master. It notifies the replica
+     * that something happened on the primary that might cause data to be in an inconsistent state.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerIncident(Event event) {
+        if (event.getData() instanceof EventDataDeserializationExceptionData) {
+            metrics.onErroneousEvent("source = " + event.toString());
+            EventDataDeserializationExceptionData data = event.getData();
+
+            EventHeaderV4 eventHeader = (EventHeaderV4) data.getCause().getEventHeader(); // safe cast, instantiated
+                                                                                          // that ourselves
+
+            // logging some additional context but not the exception itself, this will happen in handleEvent()
+            if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                LOGGER.error(
+                        "Error while deserializing binlog event at offset {}.{}" +
+                                "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        offsetContext.getOffset(),
+                        System.lineSeparator(),
+                        eventHeader.getPosition(),
+                        eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+
+                throw new RuntimeException(data.getCause());
+            } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn(
+                        "Error while deserializing binlog event at offset {}.{}" +
+                                "This exception will be ignored and the event be skipped.{}" +
+                                "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        offsetContext.getOffset(),
+                        System.lineSeparator(),
+                        System.lineSeparator(),
+                        eventHeader.getPosition(),
+                        eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename(),
+                        data.getCause());
+            }
+        } else {
+            LOGGER.error("Server incident: {}", event);
+        }
+    }
+
+    /**
+     * Handle the supplied event with a {@link RotateEventData} that signals the logs are being rotated. This means that either
+     * the server was restarted, or the binlog has transitioned to a new file. In either case, subsequent table numbers will be
+     * different than those seen to this point.
+     *
+     * @param event the database change data event to be processed; may not be null
+     */
+    protected void handleRotateLogsEvent(Event event) {
+        LOGGER.debug("Rotating logs: {}", event);
+        RotateEventData command = unwrapData(event);
+        assert command != null;
+        taskContext.getSchema().clearTableMappings();
+    }
+
+    /**
+     * Handle the supplied event with a {@link GtidEventData} that signals the beginning of a GTID transaction.
+     * We don't yet know whether this transaction contains any events we're interested in, but we have to record
+     * it so that we know the position of this event and know we've processed the binlog to this point.
+     * <p>
+     * Note that this captures the current GTID and complete GTID set, regardless of whether the connector is
+     * {@link MySqlTaskContext#gtidSourceFilter() filtering} the GTID set upon connection. We do this because
+     * we actually want to capture all GTID set values found in the binlog, whether or not we process them.
+     * However, only when we connect do we actually want to pass to MySQL only those GTID ranges that are applicable
+     * per the configuration.
+     *
+     * @param event the GTID event to be processed; may not be null
+     */
+    protected void handleGtidEvent(Event event) {
+        LOGGER.debug("GTID transaction: {}", event);
+        GtidEventData gtidEvent = unwrapData(event);
+        String gtid = gtidEvent.getGtid();
+        gtidSet.add(gtid);
+        offsetContext.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set
+        ignoreDmlEventByGtidSource = false;
+        if (gtidDmlSourceFilter != null && gtid != null) {
+            String uuid = gtid.trim().substring(0, gtid.indexOf(":"));
+            if (!gtidDmlSourceFilter.test(uuid)) {
+                ignoreDmlEventByGtidSource = true;
+            }
+        }
+        metrics.onGtidChange(gtid);
+    }
+
+    /**
+     * Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL query
+     * that generated the event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     */
+    protected void handleRowsQuery(Event event) {
+        // Unwrap the RowsQueryEvent
+        final RowsQueryEventData lastRowsQueryEventData = unwrapData(event);
+
+        // Set the query on the source
+        offsetContext.setQuery(lastRowsQueryEventData.getQuery());
+    }
+
+    /**
+     * Handle the supplied event with an {@link QueryEventData} by possibly recording the DDL statements as changes in the
+     * MySQL schemas.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while recording the DDL statements
+     */
+    protected void handleQueryEvent(Event event) throws InterruptedException {
+        QueryEventData command = unwrapData(event);
+        LOGGER.debug("Received query command: {}", event);
+        String sql = command.getSql().trim();
+        if (sql.equalsIgnoreCase("BEGIN")) {
+            // We are starting a new transaction ...
+            offsetContext.startNextTransaction();
+            eventDispatcher.dispatchTransactionStartedEvent(offsetContext.getTransactionId(), offsetContext);
+            offsetContext.setBinlogThread(command.getThreadId());
+            if (initialEventsToSkip != 0) {
+                LOGGER.debug(
+                        "Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
+                        initialEventsToSkip, startingRowNumber);
+                // We are restarting, so we need to skip the events in this transaction that we processed previously...
+                skipEvent = true;
+            }
+            return;
+        }
+        if (sql.equalsIgnoreCase("COMMIT")) {
+            handleTransactionCompletion(event);
+            return;
+        }
+
+        String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
+
+        if (upperCasedStatementBegin.startsWith("XA ")) {
+            // This is an XA transaction, and we currently ignore these and do nothing ...
+            return;
+        }
+        if (connectorConfig.getDdlFilter().test(sql)) {
+            LOGGER.debug("DDL '{}' was filtered out of processing", sql);
+            return;
+        }
+        if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ")
+                || upperCasedStatementBegin.equals("DELETE ")) {
+            if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                throw new DebeziumException(
+                        "Received DML '" + sql
+                                + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+            } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn("Warning only: Received DML '" + sql
+                        + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+                return;
+            } else {
+                return;
+            }
+        }
+        if (sql.equalsIgnoreCase("ROLLBACK")) {
+            // We have hit a ROLLBACK which is not supported
+            LOGGER.warn(
+                    "Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering",
+                    MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
+        }
+
+        final List<SchemaChangeEvent> schemaChangeEvents =
+                taskContext.getSchema().parseStreamingDdl(sql, command.getDatabase(), offsetContext,
+                        clock.currentTimeAsInstant());
+        try {
+            for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
+                if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
+                    continue;
+                }
+
+                final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null
+                        : schemaChangeEvent.getTables().iterator().next().id();
+                eventDispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> {
+                    try {
+                        receiver.schemaChangeEvent(schemaChangeEvent);
+                    } catch (Exception e) {
+                        throw new DebeziumException(e);
+                    }
+                });
+            }
+        } catch (InterruptedException e) {
+            LOGGER.info("Processing interrupted");
+        }
+    }
+
+    private void handleTransactionCompletion(Event event) throws InterruptedException {
+        // We are completing the transaction ...
+        eventDispatcher.dispatchTransactionCommittedEvent(offsetContext);
+        offsetContext.commitTransaction();
+        offsetContext.setBinlogThread(-1L);
+        skipEvent = false;
+        ignoreDmlEventByGtidSource = false;
+    }
+
+    /**
+     * Handle a change in the table metadata.
+     * <p>
+     * This method should be called whenever we consume a TABLE_MAP event, and every transaction in the log should include one
+     * of these for each table affected by the transaction. Each table map event includes a monotonically-increasing numeric
+     * identifier, and this identifier is used within subsequent events within the same transaction. This table identifier can
+     * change when:
+     * <ol>
+     * <li>the table structure is modified (e.g., via an {@code ALTER TABLE ...} command); or</li>
+     * <li>MySQL rotates to a new binary log file, even if the table structure does not change.</li>
+     * </ol>
+     *
+     * @param event the update event; never null
+     */
+    protected void handleUpdateTableMetadata(Event event) {
+        TableMapEventData metadata = unwrapData(event);
+        long tableNumber = metadata.getTableId();
+        String databaseName = metadata.getDatabase();
+        String tableName = metadata.getTable();
+        TableId tableId = new TableId(databaseName, null, tableName);
+        if (taskContext.getSchema().assignTableNumber(tableNumber, tableId)) {
+            LOGGER.debug("Received update table metadata event: {}", event);
+        } else {
+            informAboutUnknownTableIfRequired(event, tableId, "update table metadata");
+        }
+    }
+
+    /**
+     * If we receive an event for a table that is monitored but whose metadata we
+     * don't know, either ignore that event or raise a warning or error as per the
+     * {@link MySqlConnectorConfig#INCONSISTENT_SCHEMA_HANDLING_MODE} configuration.
+     */
+    private void informAboutUnknownTableIfRequired(Event event, TableId tableId, String typeToLog) {
+        if (tableId != null && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
+            metrics.onErroneousEvent("source = " + tableId + ", event " + event);
+            EventHeaderV4 eventHeader = event.getHeader();
+
+            if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                LOGGER.error(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), eventHeader.getPosition(),
+                        eventHeader.getNextPosition(), offsetContext.getSource().binlogFilename());
+                throw new DebeziumException("Encountered change event for table " + tableId
+                        + " whose schema isn't known to this connector");
+            } else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "The event will be ignored.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+                        eventHeader.getPosition(), eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+            } else {
+                LOGGER.debug(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "The event will be ignored.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+                        eventHeader.getPosition(), eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+            }
+        } else {
+            LOGGER.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);
+            metrics.onFilteredEvent("source = " + tableId);
+        }
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link WriteRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleInsert(Event event) throws InterruptedException {
+        handleChange(event, "insert", WriteRowsEventData.class, x -> taskContext.getSchema().getTableId(x.getTableId()),
+                WriteRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.CREATE, null, row)));
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link UpdateRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleUpdate(Event event) throws InterruptedException {
+        handleChange(event, "update", UpdateRowsEventData.class,
+                x -> taskContext.getSchema().getTableId(x.getTableId()), UpdateRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.UPDATE, row.getKey(),
+                                row.getValue())));
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link DeleteRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleDelete(Event event) throws InterruptedException {
+        handleChange(event, "delete", DeleteRowsEventData.class,
+                x -> taskContext.getSchema().getTableId(x.getTableId()), DeleteRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.DELETE, row, null)));
+    }
+
+    private <T extends EventData, U> void handleChange(Event event, String changeType, Class<T> eventDataClass,
+            TableIdProvider<T> tableIdProvider,
+            RowsProvider<T, U> rowsProvider, BinlogChangeEmitter<U> changeEmitter)
+            throws InterruptedException {
+        if (skipEvent) {
+            // We can skip this because we should already be at least this far ...
+            LOGGER.info("Skipping previously processed row event: {}", event);
+            return;
+        }
+        if (ignoreDmlEventByGtidSource) {
+            LOGGER.debug("Skipping DML event because this GTID source is filtered: {}", event);
+            return;
+        }
+        final T data = unwrapData(event);
+        final TableId tableId = tableIdProvider.getTableId(data);
+        final List<U> rows = rowsProvider.getRows(data);
+
+        if (tableId != null && taskContext.getSchema().schemaFor(tableId) != null) {
+            int count = 0;
+            int numRows = rows.size();
+            if (startingRowNumber < numRows) {
+                for (int row = startingRowNumber; row != numRows; ++row) {
+                    offsetContext.setRowNumber(row, numRows);
+                    offsetContext.event(tableId, eventTimestamp);
+                    changeEmitter.emit(tableId, rows.get(row));
+                    count++;
+                }
+                if (LOGGER.isDebugEnabled()) {
+                    if (startingRowNumber != 0) {
+                        LOGGER.debug("Emitted {} {} record(s) for last {} row(s) in event: {}",
+                                count, changeType, numRows - startingRowNumber, event);
+                    } else {
+                        LOGGER.debug("Emitted {} {} record(s) for event: {}", count, changeType, event);
+                    }
+                }
+                offsetContext.changeEventCompleted();
+            } else {
+                // All rows were previously processed ...
+                LOGGER.debug("Skipping previously processed {} event: {}", changeType, event);
+            }
+        } else {
+            informAboutUnknownTableIfRequired(event, tableId, changeType + " row");
+        }
+        startingRowNumber = 0;
+    }
+
+    /**
+     * Handle a {@link EventType#VIEW_CHANGE} event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void viewChange(Event event) throws InterruptedException {
+        LOGGER.debug("View Change event: {}", event);
+        // do nothing
+    }
+
+    /**
+     * Handle a {@link EventType#XA_PREPARE} event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void prepareTransaction(Event event) throws InterruptedException {
+        LOGGER.debug("XA Prepare event: {}", event);
+        // do nothing
+    }
+
+    private SSLMode sslModeFor(SecureConnectionMode mode) {
+        switch (mode) {
+            case DISABLED:
+                return SSLMode.DISABLED;
+            case PREFERRED:
+                return SSLMode.PREFERRED;
+            case REQUIRED:
+                return SSLMode.REQUIRED;
+            case VERIFY_CA:
+                return SSLMode.VERIFY_CA;
+            case VERIFY_IDENTITY:
+                return SSLMode.VERIFY_IDENTITY;
+        }
+        return null;
+    }
+
+    @Override
+    public void execute(ChangeEventSourceContext context) throws InterruptedException {
+        if (!connectorConfig.getSnapshotMode().shouldStream()) {
+            LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode());
+            return;
+        }
+        taskContext.getSchema().assureNonEmptySchema();
+        final Set<Operation> skippedOperations = connectorConfig.getSkippedOps();
+
+        // Register our event handlers ...
+        eventHandlers.put(EventType.STOP, this::handleServerStop);
+        eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
+        eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
+        eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
+        eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
+        eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
+
+        if (!skippedOperations.contains(Operation.CREATE)) {
+            eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
+            eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
+        }
+
+        if (!skippedOperations.contains(Operation.UPDATE)) {
+            eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
+            eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
+        }
+
+        if (!skippedOperations.contains(Operation.DELETE)) {
+            eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
+            eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
+        }
+
+        eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
+        eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
+        eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
+
+        // Conditionally register ROWS_QUERY handler to parse SQL statements.
+        if (connectorConfig.includeSqlQuery()) {
+            eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
+        }
+
+        client.registerEventListener(connectorConfig.bufferSizeForStreamingChangeEventSource() == 0
+                ? this::handleEvent
+                : (new EventBuffer(connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context))::add);
+
+        client.registerLifecycleListener(new ReaderThreadLifecycleListener());
+        client.registerEventListener(this::onEvent);
+        if (LOGGER.isDebugEnabled()) {
+            client.registerEventListener(this::logEvent);
+        }
+
+        final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
+        metrics.setIsGtidModeEnabled(isGtidModeEnabled);
+
+        // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium
+        // checkpoint.
+        String availableServerGtidStr = connection.knownGtidSet();
+        if (isGtidModeEnabled) {
+            // The server is using GTIDs, so enable the handler ...
+            eventHandlers.put(EventType.GTID, this::handleGtidEvent);
+
+            // Now look at the GTID set from the server and what we've previously seen ...
+            GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
+
+            // also take into account purged GTID logs
+            GtidSet purgedServerGtidSet = connection.purgedGtidSet();
+            LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);
+
+            GtidSet filteredGtidSet = filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
+            if (filteredGtidSet != null) {
+                // We've seen at least some GTIDs, so start reading from the filtered GTID set ...
+                LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
+                String filteredGtidSetStr = filteredGtidSet.toString();
+                client.setGtidSet(filteredGtidSetStr);
+                offsetContext.setCompletedGtidSet(filteredGtidSetStr);
+                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
+            } else {
+                // We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning
+                // ...
+                client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+                client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
+            }
+        } else {
+            // The server is not using GTIDs, so start reading the binlog based upon where we last left off ...
+            client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+            client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+        }
+
+        // We may be restarting in the middle of a transaction, so see how far into the transaction we have already
+        // processed...
+        initialEventsToSkip = offsetContext.eventsToSkipUponRestart();
+        LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);
+
+        // Set the starting row number, which is the next row number to be read ...
+        startingRowNumber = offsetContext.rowsToSkipUponRestart();
+        LOGGER.info("Skip {} rows on streaming start", startingRowNumber);
+
+        // Only when we reach the first BEGIN event will we start to skip events ...
+        skipEvent = false;
+
+        try {
+            // Start the log reader, which starts background threads ...
+            if (context.isRunning()) {
+                long timeout = connectorConfig.getConnectionTimeout().toMillis();
+                long started = clock.currentTimeInMillis();
+                try {
+                    LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
+                    client.connect(timeout);
+                    // Need to wait for keepalive thread to be running, otherwise it can be left orphaned
+                    // The problem is with timing. When the close is called too early after connect then
+                    // the keepalive thread is not terminated
+                    if (client.isKeepAlive()) {
+                        LOGGER.info("Waiting for keepalive thread to start");
+                        final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
+                        int waitAttempts = 50;
+                        boolean keepAliveThreadRunning = false;
+                        while (!keepAliveThreadRunning && waitAttempts-- > 0) {
+                            for (Thread t : binaryLogClientThreads.values()) {
+                                if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
+                                    LOGGER.info("Keepalive thread is running");
+                                    keepAliveThreadRunning = true;
+                                }
+                            }
+                            metronome.pause();
+                        }
+                    }
+                } catch (TimeoutException e) {
+                    // If the client thread is interrupted *before* the client could connect, the client throws a
+                    // timeout exception
+                    // The only way we can distinguish this is if we get the timeout exception before the specified
+                    // timeout has
+                    // elapsed, so we simply check this (within 10%) ...
+                    long duration = clock.currentTimeInMillis() - started;
+                    if (duration > (0.9 * timeout)) {
+                        double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
+                        throw new DebeziumException(
+                                "Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
+                                        connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                                        + connectorConfig.username() + "'",
+                                e);
+                    }
+                    // Otherwise, we were told to shutdown, so we don't care about the timeout exception
+                } catch (AuthenticationException e) {
+                    throw new DebeziumException("Failed to authenticate to the MySQL database at " +
+                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                            + connectorConfig.username() + "'", e);
+                } catch (Throwable e) {
+                    throw new DebeziumException("Unable to connect to the MySQL database at " +
+                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                            + connectorConfig.username() + "': " + e.getMessage(), e);
+                }
+            }
+            while (context.isRunning()) {
+                Thread.sleep(100);
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (Exception e) {
+                LOGGER.info("Exception while stopping binary log client", e);
+            }
+        }
+    }
+
+    private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connectorConfig,
+            MySqlConnection connection) {
+        String acceptedTlsVersion = connection.getSessionVariableForSslVersion();
+        if (!isNullOrEmpty(acceptedTlsVersion)) {
+            SSLMode sslMode = sslModeFor(connectorConfig.sslMode());
+
+            // Keystore settings can be passed via system properties too so we need to read them
+            final String password = System.getProperty("javax.net.ssl.keyStorePassword");
+            final String keyFilename = System.getProperty("javax.net.ssl.keyStore");
+            KeyManager[] keyManagers = null;
+            if (keyFilename != null) {
+                final char[] passwordArray = (password == null) ? null : password.toCharArray();
+                try {
+                    KeyStore ks = KeyStore.getInstance("JKS");
+                    ks.load(new FileInputStream(keyFilename), passwordArray);
+
+                    KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
+                    kmf.init(ks, passwordArray);
+
+                    keyManagers = kmf.getKeyManagers();
+                } catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException
+                        | UnrecoverableKeyException e) {
+                    throw new DebeziumException("Could not load keystore", e);
+                }
+            }
+
+            // DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that
+            // the accepted TLS version is passed to the constructed factory
+            if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) {
+                final KeyManager[] finalKMS = keyManagers;
+                return new DefaultSSLSocketFactory(acceptedTlsVersion) {
+
+                    @Override
+                    protected void initSSLContext(SSLContext sc)
+                            throws GeneralSecurityException {
+                        sc.init(finalKMS, new TrustManager[]{
+                                new X509TrustManager() {
+
+                                    @Override
+                                    public void checkClientTrusted(
+                                            X509Certificate[] x509Certificates,
+                                            String s)
+                                            throws CertificateException {
+                                    }
+
+                                    @Override
+                                    public void checkServerTrusted(
+                                            X509Certificate[] x509Certificates,
+                                            String s)
+                                            throws CertificateException {
+                                    }
+
+                                    @Override
+                                    public X509Certificate[] getAcceptedIssuers() {
+                                        return new X509Certificate[0];
+                                    }
+                                }
+                        }, null);
+                    }
+                };
+            } else {
+                return new DefaultSSLSocketFactory(acceptedTlsVersion);
+            }
+        }
+
+        return null;
+    }
+
+    private void logStreamingSourceState() {
+        logStreamingSourceState(Level.ERROR);
+    }
+
+    protected void logEvent(Event event) {
+        LOGGER.trace("Received event: {}", event);
+    }
+
+    private void logStreamingSourceState(Level severity) {
+        final Object position = client == null ? "N/A" : client.getBinlogFilename() + "/" + client.getBinlogPosition();
+        final String message =
+                "Error during binlog processing. Last offset stored = {}, binlog reader near position = {}";
+        switch (severity) {
+            case WARN:
+                LOGGER.warn(message, lastOffset, position);
+                break;
+            case DEBUG:
+                LOGGER.debug(message, lastOffset, position);
+                break;
+            default:
+                LOGGER.error(message, lastOffset, position);
+        }
+    }
+
+    /**
+     * Apply the include/exclude GTID source filters to the current {@link #source() GTID set} and merge them onto the
+     * currently available GTID set from a MySQL server.
+     *
+     * The merging behavior of this method might seem a bit strange at first. It's required in order for Debezium to consume a
+     * MySQL binlog that has multi-source replication enabled, if a failover has to occur. In such a case, the server that
+     * Debezium is failed over to might have a different set of sources, but still include the sources required for Debezium
+     * to continue to function. MySQL does not allow downstream replicas to connect if the GTID set does not contain GTIDs for
+     * all channels that the server is replicating from, even if the server does have the data needed by the client. To get
+     * around this, we can have Debezium merge its GTID set with whatever is on the server, so that MySQL will allow it to
+     * connect. See <a href="https://issues.jboss.org/browse/DBZ-143">DBZ-143</a> for details.
+     *
+     * This method does not mutate any state in the context.
+     *
+     * @param availableServerGtidSet the GTID set currently available in the MySQL server
+     * @param purgedServerGtid the GTID set already purged by the MySQL server
+     * @return A GTID set meant for consuming from a MySQL binlog; may return null if the SourceInfo has no GTIDs and therefore
+     *         none were filtered
+     */
+    public GtidSet filterGtidSet(GtidSet availableServerGtidSet, GtidSet purgedServerGtid) {
+        String gtidStr = offsetContext.gtidSet();
+        if (gtidStr == null) {
+            return null;
+        }
+        LOGGER.info("Attempting to generate a filtered GTID set");
+        LOGGER.info("GTID set from previous recorded offset: {}", gtidStr);
+        GtidSet filteredGtidSet = new GtidSet(gtidStr);
+        Predicate<String> gtidSourceFilter = connectorConfig.gtidSourceFilter();
+        if (gtidSourceFilter != null) {
+            filteredGtidSet = filteredGtidSet.retainAll(gtidSourceFilter);
+            LOGGER.info("GTID set after applying GTID source includes/excludes to previous recorded offset: {}",
+                    filteredGtidSet);
+        }
+        LOGGER.info("GTID set available on server: {}", availableServerGtidSet);
+
+        GtidSet mergedGtidSet;
+
+        if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
+            final GtidSet knownGtidSet = filteredGtidSet;
+            LOGGER.info("Using first available positions for new GTID channels");
+            final GtidSet relevantAvailableServerGtidSet =
+                    (gtidSourceFilter != null) ? availableServerGtidSet.retainAll(gtidSourceFilter)
+                            : availableServerGtidSet;
+            LOGGER.info("Relevant GTID set available on server: {}", relevantAvailableServerGtidSet);
+
+            // Since the GTID recorded in the checkpoint represents the CDC-executed records, in
+            // certain scenarios
+            // (such as when the startup mode is earliest/timestamp/binlogfile), the recorded GTID
+            // may not start from
+            // the beginning. For example, A:300-500. However, during job recovery, we usually only
+            // need to focus on
+            // the last consumed point instead of consuming A:1-299. Therefore, some adjustments
+            // need to be made to the
+            // recorded offset in the checkpoint, and the available GTID for other MySQL instances
+            // should be completed.
+            mergedGtidSet = GtidUtils.fixRestoredGtidSet(
+                    GtidUtils.mergeGtidSetInto(
+                            relevantAvailableServerGtidSet.retainAll(
+                                    uuid -> knownGtidSet.forServerWithId(uuid) != null),
+                            purgedServerGtid),
+                    filteredGtidSet);
+        } else {
+            mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
+        }
+
+        LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
+        return mergedGtidSet;
+    }
+
+    MySqlStreamingChangeEventSourceMetrics getMetrics() {
+        return metrics;
+    }
+
+    void rewindBinaryLogClient(ChangeEventSourceContext context, BinlogPosition position) {
+        try {
+            if (context.isRunning()) {
+                LOGGER.debug("Rewinding binlog to position {}", position);
+                client.disconnect();
+                client.setBinlogFilename(position.getFilename());
+                client.setBinlogPosition(position.getPosition());
+                client.connect();
+            }
+        } catch (IOException e) {
+            LOGGER.error("Unexpected error when re-connecting to the MySQL binary log reader", e);
+        }
+    }
+
+    BinlogPosition getCurrentBinlogPosition() {
+        return new BinlogPosition(client.getBinlogFilename(), client.getBinlogPosition());
+    }
+
+    /**
+     * Wraps the specified exception in a {@link DebeziumException}, ensuring that all useful state is captured inside
+     * the new exception's message.
+     *
+     * @param error the exception; may not be null
+     * @return the wrapped Kafka Connect exception
+     */
+    protected DebeziumException wrap(Throwable error) {
+        assert error != null;
+        String msg = error.getMessage();
+        if (error instanceof ServerException) {
+            ServerException e = (ServerException) error;
+            msg = msg + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSqlState() + ".";
+        } else if (error instanceof SQLException) {
+            SQLException e = (SQLException) error;
+            msg = e.getMessage() + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSQLState() + ".";
+        }
+        return new DebeziumException(msg, error);
+    }
+
+    protected final class ReaderThreadLifecycleListener implements LifecycleListener {
+
+        @Override
+        public void onDisconnect(BinaryLogClient client) {
+            if (LOGGER.isInfoEnabled()) {
+                taskContext.temporaryLoggingContext(connectorConfig, "binlog", () -> {
+                    Map<String, ?> offset = lastOffset;
+                    if (offset != null) {
+                        LOGGER.info("Stopped reading binlog after {} events, last recorded offset: {}",
+                                totalRecordCounter, offset);
+                    } else {
+                        LOGGER.info("Stopped reading binlog after {} events, no new offset was recorded",
+                                totalRecordCounter);
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void onConnect(BinaryLogClient client) {
+            // Set up the MDC logging context for this thread ...
+            taskContext.configureLoggingContext("binlog");
+
+            // The event row number will be used when processing the first event ...
+            LOGGER.info("Connected to MySQL binlog at {}:{}, starting at {}", connectorConfig.hostname(),
+                    connectorConfig.port(), offsetContext);
+        }
+
+        @Override
+        public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
+            LOGGER.debug("A communication failure event arrived", ex);
+            logStreamingSourceState();
+            try {
+                // Stop BinaryLogClient background threads
+                client.disconnect();
+            } catch (final Exception e) {
+                LOGGER.debug("Exception while closing client", e);
+            }
+            errorHandler.setProducerThrowable(wrap(ex));
+        }
+
+        @Override
+        public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
+            if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                LOGGER.debug("A deserialization failure event arrived", ex);
+                logStreamingSourceState();
+                errorHandler.setProducerThrowable(wrap(ex));
+            } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn("A deserialization failure event arrived", ex);
+                logStreamingSourceState(Level.WARN);
+            } else {
+                LOGGER.debug("A deserialization failure event arrived", ex);
+                logStreamingSourceState(Level.DEBUG);
+            }
+        }
+    }
+
+    protected void setRestoredFromCheckpoint() {
+        this.isRestoredFromCheckpoint = true;
+    }
+
+    @FunctionalInterface
+    private interface TableIdProvider<E extends EventData> {
+
+        TableId getTableId(E data);
+    }
+
+    @FunctionalInterface
+    private interface RowsProvider<E extends EventData, U> {
+
+        List<U> getRows(E data);
+    }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
index 7082845a56..e07bc3d656 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
@@ -28,6 +28,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient;
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.connector.mysql.GtidSet;
+import io.debezium.connector.mysql.GtidUtils;
 import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
 import io.debezium.connector.mysql.MySqlConnection;
 import io.debezium.connector.mysql.MySqlConnectorConfig;
@@ -219,10 +220,20 @@ public class StatefulTaskContext {
                     "Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
             return false;
         }
-        // GTIDs are enabled
-        GtidSet gtidSet = new GtidSet(gtidStr);
         // Get the GTID set that is available in the server ...
         GtidSet availableGtidSet = new GtidSet(availableGtidStr);
+
+        // GTIDs are enabled
+        LOG.info("Merging server GTID set {} with restored GTID set {}", availableGtidSet, gtidStr);
+
+        // Based on the current server's GTID, the GTID in MySqlOffsetContext is adjusted to ensure
+        // the completeness of
+        // the GTID. This is done to address the issue of being unable to recover from a checkpoint
+        // in certain startup
+        // modes.
+        GtidSet gtidSet = GtidUtils.fixRestoredGtidSet(availableGtidSet, new GtidSet(gtidStr));
+        LOG.info("Merged GTID set is {}", gtidSet);
+
         if (gtidSet.isContainedWithin(availableGtidSet)) {
             LOG.info(
                     "MySQL current GTID set {} does contain the GTID set {} required by the connector.",
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 5328121085..4ad493afe9 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -438,6 +438,8 @@
       inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/FinishedSnapshotSplitInfo.java
       inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeFetcher.java
       inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
+      inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
+      inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
       inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
       inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
       inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java