You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/06/26 02:11:54 UTC
[inlong] branch master updated: [INLONG-8307][Sort] Fix job restart failed from savepoint when set 'scan.startup.mode' = 'timestamp|earliest-offset|specific-offset' (#8308)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cf4b01e3bd [INLONG-8307][Sort] Fix job restart failed from savepoint when set 'scan.startup.mode' = 'timestamp|earliest-offset|specific-offset' (#8308)
cf4b01e3bd is described below
commit cf4b01e3bdb3bdbc61c5043fab3d275c44c882f0
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Mon Jun 26 10:11:49 2023 +0800
[INLONG-8307][Sort] Fix job restart failed from savepoint when set 'scan.startup.mode' = 'timestamp|earliest-offset|specific-offset' (#8308)
---
.../sort-connectors/mysql-cdc/pom.xml | 1 +
.../io/debezium/connector/mysql/GtidUtils.java | 94 ++
.../mysql/MySqlStreamingChangeEventSource.java | 1262 ++++++++++++++++++++
.../debezium/task/context/StatefulTaskContext.java | 15 +-
licenses/inlong-sort-connectors/LICENSE | 2 +
5 files changed, 1372 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml
index 92c9a28d08..23a5c165a6 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/pom.xml
@@ -118,6 +118,7 @@
<artifact>org.apache.inlong:sort-connector-*</artifact>
<includes>
<include>org/apache/inlong/**</include>
+ <include>io/debezium/connector/mysql/**</include>
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
</includes>
</filter>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
new file mode 100644
index 0000000000..52a6d5dace
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import io.debezium.connector.mysql.GtidSet.UUIDSet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for handling GTIDs. */
+public class GtidUtils {
+
+ /**
+ * This method corrects the GTID set that has been restored from a state or checkpoint using the
+ * GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the
+ * restored GTID set is adjusted according to the server's GTID set to ensure it does not exceed
+ * the latter. For each UUID in the restored GTID set, if it exists in the server's GTID set,
+ * then it will be adjusted according to the server's GTID set; if it does not exist in the
+ * server's GTID set, it will be directly added to the new GTID set.
+ */
+ public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
+ Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
+ serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
+ for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
+ GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
+ if (serverUuidSet != null) {
+ long restoredIntervalEnd = getIntervalEnd(uuidSet);
+ List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
+ new ArrayList<>();
+ for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) {
+ if (serverInterval.getEnd() <= restoredIntervalEnd) {
+ newIntervals.add(
+ new com.github.shyiko.mysql.binlog.GtidSet.Interval(
+ serverInterval.getStart(), serverInterval.getEnd()));
+ } else if (serverInterval.getStart() <= restoredIntervalEnd
+ && serverInterval.getEnd() > restoredIntervalEnd) {
+ newIntervals.add(
+ new com.github.shyiko.mysql.binlog.GtidSet.Interval(
+ serverInterval.getStart(), restoredIntervalEnd));
+ }
+ }
+ newSet.put(uuidSet.getUUID(), new UUIDSet(new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
+ uuidSet.getUUID(), newIntervals)));
+ newSet.put(
+ uuidSet.getUUID(),
+ new GtidSet.UUIDSet(
+ new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
+ uuidSet.getUUID(), newIntervals)));
+ } else {
+ newSet.put(uuidSet.getUUID(), uuidSet);
+ }
+ }
+ return new GtidSet(newSet);
+ }
+
+ /**
+ * This method merges one GTID set (toMerge) into another (base), without overwriting the
+ * existing elements in the base GTID set.
+ */
+ public static GtidSet mergeGtidSetInto(GtidSet base, GtidSet toMerge) {
+ Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
+ base.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
+ for (GtidSet.UUIDSet uuidSet : toMerge.getUUIDSets()) {
+ if (!newSet.containsKey(uuidSet.getUUID())) {
+ newSet.put(uuidSet.getUUID(), uuidSet);
+ }
+ }
+ return new GtidSet(newSet);
+ }
+
+ private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) {
+ return uuidSet.getIntervals().stream()
+ .mapToLong(GtidSet.Interval::getEnd)
+ .max()
+ .getAsLong();
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
new file mode 100644
index 0000000000..16da062352
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -0,0 +1,1262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import com.github.shyiko.mysql.binlog.network.AuthenticationException;
+import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.ServerException;
+import io.debezium.DebeziumException;
+import io.debezium.annotation.SingleThreadAccess;
+import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
+import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
+import io.debezium.data.Envelope.Operation;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
+import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import static io.debezium.util.Strings.isNullOrEmpty;
+
+/**
+ * Copied from Debezium project to fix
+ * https://github.com/apache/inlong/issues/8307.
+ *
+ * <p>Line 1134-1139 : Adjust GTID merging logic to support recovering from job which previously
+ * specifying starting offset on start.
+ */
+public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class);
+
+ private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
+
+ private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);
+ private final BinaryLogClient client;
+ private final MySqlStreamingChangeEventSourceMetrics metrics;
+ private final Clock clock;
+ private final EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode;
+ private final EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode;
+
+ private int startingRowNumber = 0;
+ private long initialEventsToSkip = 0L;
+ private boolean skipEvent = false;
+ private boolean ignoreDmlEventByGtidSource = false;
+ private final Predicate<String> gtidDmlSourceFilter;
+ private final AtomicLong totalRecordCounter = new AtomicLong();
+ private volatile Map<String, ?> lastOffset = null;
+ private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
+ private final float heartbeatIntervalFactor = 0.8f;
+ private final Map<String, Thread> binaryLogClientThreads = new ConcurrentHashMap<>(4);
+ private final MySqlTaskContext taskContext;
+ private final MySqlConnectorConfig connectorConfig;
+ private final MySqlConnection connection;
+ private final EventDispatcher<TableId> eventDispatcher;
+ private final MySqlOffsetContext offsetContext;
+ private final ErrorHandler errorHandler;
+ private boolean isRestoredFromCheckpoint = false;
+
+ @SingleThreadAccess("binlog client thread")
+ private Instant eventTimestamp;
+
+ public static class BinlogPosition {
+
+ final String filename;
+ final long position;
+
+ public BinlogPosition(String filename, long position) {
+ assert filename != null;
+
+ this.filename = filename;
+ this.position = position;
+ }
+
+ public String getFilename() {
+ return filename;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ @Override
+ public String toString() {
+ return filename + "/" + position;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + filename.hashCode();
+ result = prime * result + (int) (position ^ (position >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BinlogPosition other = (BinlogPosition) obj;
+ if (!filename.equals(other.filename)) {
+ return false;
+ }
+ if (position != other.position) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ @FunctionalInterface
+ private static interface BinlogChangeEmitter<T> {
+
+ void emit(TableId tableId, T data) throws InterruptedException;
+ }
+
+ public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlOffsetContext offsetContext,
+ MySqlConnection connection,
+ EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
+ MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics) {
+
+ this.taskContext = taskContext;
+ this.connectorConfig = connectorConfig;
+ this.connection = connection;
+ this.clock = clock;
+ this.eventDispatcher = dispatcher;
+ this.errorHandler = errorHandler;
+ // With snapshot mode NEVER the initial context is not created by snapshot
+ this.offsetContext = (offsetContext == null) ? MySqlOffsetContext.initial(connectorConfig) : offsetContext;
+ this.metrics = metrics;
+
+ eventDeserializationFailureHandlingMode = connectorConfig.getEventProcessingFailureHandlingMode();
+ inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();
+
+ // Set up the log reader ...
+ client = taskContext.getBinaryLogClient();
+ // BinaryLogClient will overwrite thread names later
+ client.setThreadFactory(
+ Threads.threadFactory(MySqlConnector.class, connectorConfig.getLogicalName(), "binlog-client", false,
+ false,
+ x -> binaryLogClientThreads.put(x.getName(), x)));
+ client.setServerId(connectorConfig.serverId());
+ client.setSSLMode(sslModeFor(connectorConfig.sslMode()));
+ if (connectorConfig.sslModeEnabled()) {
+ SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectorConfig, connection);
+ if (sslSocketFactory != null) {
+ client.setSslSocketFactory(sslSocketFactory);
+ }
+ }
+ Configuration configuration = connectorConfig.getConfig();
+ client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
+ final long keepAliveInterval = configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
+ client.setKeepAliveInterval(keepAliveInterval);
+ // Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor
+ // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default value of
+ // heartbeatIntervalFactor
+ // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from
+ // the MySQL server.
+ client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));
+
+ boolean filterDmlEventsByGtidSource =
+ configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
+ gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
+
+ // Set up the event deserializer with additional type(s) ...
+ final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
+ EventDeserializer eventDeserializer = new EventDeserializer() {
+
+ @Override
+ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
+ try {
+ // Delegate to the superclass ...
+ Event event = super.nextEvent(inputStream);
+
+ // We have to record the most recent TableMapEventData for each table number for our custom
+ // deserializers ...
+ if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
+ TableMapEventData tableMapEvent = event.getData();
+ tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
+ }
+ return event;
+ }
+ // DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
+ catch (EventDataDeserializationException edde) {
+ // DBZ-3095 As of Java 15, when reaching EOF in the binlog stream, the polling loop in
+ // BinaryLogClient#listenForEventPackets() keeps returning values != -1 from peek();
+ // this causes the loop to never finish
+ // Propagating the exception (either EOF or socket closed) causes the loop to be aborted
+ // in this case
+ if (edde.getCause() instanceof IOException) {
+ throw edde;
+ }
+
+ EventHeaderV4 header = new EventHeaderV4();
+ header.setEventType(EventType.INCIDENT);
+ header.setTimestamp(edde.getEventHeader().getTimestamp());
+ header.setServerId(edde.getEventHeader().getServerId());
+
+ if (edde.getEventHeader() instanceof EventHeaderV4) {
+ header.setEventLength(((EventHeaderV4) edde.getEventHeader()).getEventLength());
+ header.setNextPosition(((EventHeaderV4) edde.getEventHeader()).getNextPosition());
+ header.setFlags(((EventHeaderV4) edde.getEventHeader()).getFlags());
+ }
+
+ EventData data = new EventDataDeserializationExceptionData(edde);
+ return new Event(header, data);
+ }
+ }
+ };
+
+ // Add our custom deserializers ...
+ eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
+ eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
+ eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
+ new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
+ eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
+ new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
+ eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
+ new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
+ eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
+ new RowDeserializers.WriteRowsDeserializer(
+ tableMapEventByTableId).setMayContainExtraInformation(true));
+ eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
+ new RowDeserializers.UpdateRowsDeserializer(
+ tableMapEventByTableId).setMayContainExtraInformation(true));
+ eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
+ new RowDeserializers.DeleteRowsDeserializer(
+ tableMapEventByTableId).setMayContainExtraInformation(true));
+ client.setEventDeserializer(eventDeserializer);
+ }
+
+ protected void onEvent(Event event) {
+ long ts = 0;
+
+ if (event.getHeader().getEventType() == EventType.HEARTBEAT) {
+ // HEARTBEAT events have no timestamp but are fired only when
+ // there is no traffic on the connection which means we are caught-up
+ // https://dev.mysql.com/doc/internals/en/heartbeat-event.html
+ metrics.setMilliSecondsBehindSource(ts);
+ return;
+ }
+
+ // MySQL has seconds resolution but mysql-binlog-connector-java returns
+ // a value in milliseconds
+ long eventTs = event.getHeader().getTimestamp();
+
+ if (eventTs == 0) {
+ LOGGER.trace("Received unexpected event with 0 timestamp: {}", event);
+ return;
+ }
+
+ ts = clock.currentTimeInMillis() - eventTs;
+ LOGGER.trace("Current milliseconds behind source: {} ms", ts);
+ metrics.setMilliSecondsBehindSource(ts);
+ }
+
+ protected void ignoreEvent(Event event) {
+ LOGGER.trace("Ignoring event due to missing handler: {}", event);
+ }
+
+ protected void handleEvent(Event event) {
+ if (event == null) {
+ return;
+ }
+
+ final EventHeader eventHeader = event.getHeader();
+ // Update the source offset info. Note that the client returns the value in *milliseconds*, even though the
+ // binlog
+ // contains only *seconds* precision ...
+ // HEARTBEAT events have no timestamp; only set the timestamp if the event is not a HEARTBEAT
+ eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT)
+ ? Instant.ofEpochMilli(eventHeader.getTimestamp())
+ : null;
+ offsetContext.setBinlogThread(eventHeader.getServerId());
+
+ final EventType eventType = eventHeader.getEventType();
+ if (eventType == EventType.ROTATE) {
+ EventData eventData = event.getData();
+ RotateEventData rotateEventData;
+ if (eventData instanceof EventDeserializer.EventDataWrapper) {
+ rotateEventData = (RotateEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+ } else {
+ rotateEventData = (RotateEventData) eventData;
+ }
+ offsetContext.setBinlogStartPoint(rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition());
+ } else if (eventHeader instanceof EventHeaderV4) {
+ EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
+ offsetContext.setEventPosition(trackableEventHeader.getPosition(), trackableEventHeader.getEventLength());
+ }
+
+ // If there is a handler for this event, forward the event to it ...
+ try {
+ // Forward the event to the handler ...
+ eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event);
+
+ // Generate heartbeat message if the time is right
+ eventDispatcher.dispatchHeartbeatEvent(offsetContext);
+
+ // Capture that we've completed another event ...
+ offsetContext.completeEvent();
+
+ if (skipEvent) {
+ // We're in the mode of skipping events and we just skipped this one, so decrement our skip count ...
+ --initialEventsToSkip;
+ skipEvent = initialEventsToSkip > 0;
+ }
+ } catch (RuntimeException e) {
+ // There was an error in the event handler, so propagate the failure to Kafka Connect ...
+ logStreamingSourceState();
+ errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog event", e));
+ // Do not stop the client, since Kafka Connect should stop the connector on it's own
+ // (and doing it here may cause problems the second time it is stopped).
+ // We can clear the listeners though so that we ignore all future events ...
+ eventHandlers.clear();
+ LOGGER.info(
+ "Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.");
+ } catch (InterruptedException e) {
+ // Most likely because this reader was stopped and our thread was interrupted ...
+ Thread.currentThread().interrupt();
+ eventHandlers.clear();
+ LOGGER.info("Stopped processing binlog events due to thread interruption");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T extends EventData> T unwrapData(Event event) {
+ EventData eventData = event.getData();
+ if (eventData instanceof EventDeserializer.EventDataWrapper) {
+ eventData = ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+ }
+ return (T) eventData;
+ }
+
+ /**
+ * Handle the supplied event that signals that mysqld has stopped.
+ *
+ * @param event the server stopped event to be processed; may not be null
+ */
+ protected void handleServerStop(Event event) {
+ LOGGER.debug("Server stopped: {}", event);
+ }
+
+ /**
+ * Handle the supplied event that is sent by a primary to a replica to let the replica know that the primary is still alive. Not
+ * written to a binary log.
+ *
+ * @param event the server stopped event to be processed; may not be null
+ */
+ protected void handleServerHeartbeat(Event event) {
+ LOGGER.trace("Server heartbeat: {}", event);
+ }
+
+ /**
+ * Handle the supplied event that signals that an out of the ordinary event that occurred on the master. It notifies the replica
+ * that something happened on the primary that might cause data to be in an inconsistent state.
+ *
+ * @param event the server stopped event to be processed; may not be null
+ */
+ protected void handleServerIncident(Event event) {
+ if (event.getData() instanceof EventDataDeserializationExceptionData) {
+ metrics.onErroneousEvent("source = " + event.toString());
+ EventDataDeserializationExceptionData data = event.getData();
+
+ EventHeaderV4 eventHeader = (EventHeaderV4) data.getCause().getEventHeader(); // safe cast, instantiated
+ // that ourselves
+
+ // logging some additional context but not the exception itself, this will happen in handleEvent()
+ if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+ LOGGER.error(
+ "Error while deserializing binlog event at offset {}.{}" +
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+ offsetContext.getOffset(),
+ System.lineSeparator(),
+ eventHeader.getPosition(),
+ eventHeader.getNextPosition(),
+ offsetContext.getSource().binlogFilename());
+
+ throw new RuntimeException(data.getCause());
+ } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+ LOGGER.warn(
+ "Error while deserializing binlog event at offset {}.{}" +
+ "This exception will be ignored and the event be skipped.{}" +
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+ offsetContext.getOffset(),
+ System.lineSeparator(),
+ System.lineSeparator(),
+ eventHeader.getPosition(),
+ eventHeader.getNextPosition(),
+ offsetContext.getSource().binlogFilename(),
+ data.getCause());
+ }
+ } else {
+ LOGGER.error("Server incident: {}", event);
+ }
+ }
+
+ /**
+ * Handle the supplied event with a {@link RotateEventData} that signals the logs are being rotated. This means that either
+ * the server was restarted, or the binlog has transitioned to a new file. In either case, subsequent table numbers will be
+ * different than those seen to this point.
+ *
+ * @param event the database change data event to be processed; may not be null
+ */
+ protected void handleRotateLogsEvent(Event event) {
+ LOGGER.debug("Rotating logs: {}", event);
+ RotateEventData command = unwrapData(event);
+ assert command != null;
+ taskContext.getSchema().clearTableMappings();
+ }
+
+ /**
+ * Handle the supplied event with a {@link GtidEventData} that signals the beginning of a GTID transaction.
+ * We don't yet know whether this transaction contains any events we're interested in, but we have to record
+ * it so that we know the position of this event and know we've processed the binlog to this point.
+ * <p>
+ * Note that this captures the current GTID and complete GTID set, regardless of whether the connector is
+ * {@link MySqlTaskContext#gtidSourceFilter() filtering} the GTID set upon connection. We do this because
+ * we actually want to capture all GTID set values found in the binlog, whether or not we process them.
+ * However, only when we connect do we actually want to pass to MySQL only those GTID ranges that are applicable
+ * per the configuration.
+ *
+ * @param event the GTID event to be processed; may not be null
+ */
+ protected void handleGtidEvent(Event event) {
+ LOGGER.debug("GTID transaction: {}", event);
+ GtidEventData gtidEvent = unwrapData(event);
+ String gtid = gtidEvent.getGtid();
+ gtidSet.add(gtid);
+ offsetContext.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set
+ ignoreDmlEventByGtidSource = false;
+ if (gtidDmlSourceFilter != null && gtid != null) {
+ String uuid = gtid.trim().substring(0, gtid.indexOf(":"));
+ if (!gtidDmlSourceFilter.test(uuid)) {
+ ignoreDmlEventByGtidSource = true;
+ }
+ }
+ metrics.onGtidChange(gtid);
+ }
+
+ /**
+ * Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL query
+ * that generated the event.
+ *
+ * @param event the database change data event to be processed; may not be null
+ */
+ protected void handleRowsQuery(Event event) {
+ // Unwrap the RowsQueryEvent
+ final RowsQueryEventData lastRowsQueryEventData = unwrapData(event);
+
+ // Set the query on the source
+ offsetContext.setQuery(lastRowsQueryEventData.getQuery());
+ }
+
+ /**
+ * Handle the supplied event with an {@link QueryEventData} by possibly recording the DDL statements as changes in the
+ * MySQL schemas.
+ *
+ * @param event the database change data event to be processed; may not be null
+ * @throws InterruptedException if this thread is interrupted while recording the DDL statements
+ */
+ protected void handleQueryEvent(Event event) throws InterruptedException {
+ QueryEventData command = unwrapData(event);
+ LOGGER.debug("Received query command: {}", event);
+ String sql = command.getSql().trim();
+ if (sql.equalsIgnoreCase("BEGIN")) {
+ // We are starting a new transaction ...
+ offsetContext.startNextTransaction();
+ eventDispatcher.dispatchTransactionStartedEvent(offsetContext.getTransactionId(), offsetContext);
+ offsetContext.setBinlogThread(command.getThreadId());
+ if (initialEventsToSkip != 0) {
+ LOGGER.debug(
+ "Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
+ initialEventsToSkip, startingRowNumber);
+ // We are restarting, so we need to skip the events in this transaction that we processed previously...
+ skipEvent = true;
+ }
+ return;
+ }
+ if (sql.equalsIgnoreCase("COMMIT")) {
+ handleTransactionCompletion(event);
+ return;
+ }
+
+ String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
+
+ if (upperCasedStatementBegin.startsWith("XA ")) {
+ // This is an XA transaction, and we currently ignore these and do nothing ...
+ return;
+ }
+ if (connectorConfig.getDdlFilter().test(sql)) {
+ LOGGER.debug("DDL '{}' was filtered out of processing", sql);
+ return;
+ }
+ if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ")
+ || upperCasedStatementBegin.equals("DELETE ")) {
+ if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+ throw new DebeziumException(
+ "Received DML '" + sql
+ + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+ } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+ LOGGER.warn("Warning only: Received DML '" + sql
+ + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+ return;
+ } else {
+ return;
+ }
+ }
+ if (sql.equalsIgnoreCase("ROLLBACK")) {
+ // We have hit a ROLLBACK which is not supported
+ LOGGER.warn(
+ "Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering",
+ MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
+ }
+
+ final List<SchemaChangeEvent> schemaChangeEvents =
+ taskContext.getSchema().parseStreamingDdl(sql, command.getDatabase(), offsetContext,
+ clock.currentTimeAsInstant());
+ try {
+ for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
+ if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
+ continue;
+ }
+
+ final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null
+ : schemaChangeEvent.getTables().iterator().next().id();
+ eventDispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> {
+ try {
+ receiver.schemaChangeEvent(schemaChangeEvent);
+ } catch (Exception e) {
+ throw new DebeziumException(e);
+ }
+ });
+ }
+ } catch (InterruptedException e) {
+ LOGGER.info("Processing interrupted");
+ }
+ }
+
+ private void handleTransactionCompletion(Event event) throws InterruptedException {
+ // We are completing the transaction ...
+ eventDispatcher.dispatchTransactionCommittedEvent(offsetContext);
+ offsetContext.commitTransaction();
+ offsetContext.setBinlogThread(-1L);
+ skipEvent = false;
+ ignoreDmlEventByGtidSource = false;
+ }
+
+ /**
+ * Handle a change in the table metadata.
+ * <p>
+ * This method should be called whenever we consume a TABLE_MAP event, and every transaction in the log should include one
+ * of these for each table affected by the transaction. Each table map event includes a monotonically-increasing numeric
+ * identifier, and this identifier is used within subsequent events within the same transaction. This table identifier can
+ * change when:
+ * <ol>
+ * <li>the table structure is modified (e.g., via an {@code ALTER TABLE ...} command); or</li>
+ * <li>MySQL rotates to a new binary log file, even if the table structure does not change.</li>
+ * </ol>
+ *
+ * @param event the update event; never null
+ */
+ protected void handleUpdateTableMetadata(Event event) {
+ TableMapEventData metadata = unwrapData(event);
+ long tableNumber = metadata.getTableId();
+ String databaseName = metadata.getDatabase();
+ String tableName = metadata.getTable();
+ TableId tableId = new TableId(databaseName, null, tableName);
+ if (taskContext.getSchema().assignTableNumber(tableNumber, tableId)) {
+ LOGGER.debug("Received update table metadata event: {}", event);
+ } else {
+ informAboutUnknownTableIfRequired(event, tableId, "update table metadata");
+ }
+ }
+
+ /**
+ * If we receive an event for a table that is monitored but whose metadata we
+ * don't know, either ignore that event or raise a warning or error as per the
+ * {@link MySqlConnectorConfig#INCONSISTENT_SCHEMA_HANDLING_MODE} configuration.
+ */
+ private void informAboutUnknownTableIfRequired(Event event, TableId tableId, String typeToLog) {
+ if (tableId != null && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
+ metrics.onErroneousEvent("source = " + tableId + ", event " + event);
+ EventHeaderV4 eventHeader = event.getHeader();
+
+ if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+ LOGGER.error(
+ "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+ + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+ event, offsetContext.getOffset(), tableId, System.lineSeparator(), eventHeader.getPosition(),
+ eventHeader.getNextPosition(), offsetContext.getSource().binlogFilename());
+ throw new DebeziumException("Encountered change event for table " + tableId
+ + " whose schema isn't known to this connector");
+ } else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+ LOGGER.warn(
+ "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+ + "The event will be ignored.{}"
+ + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+ event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+ eventHeader.getPosition(), eventHeader.getNextPosition(),
+ offsetContext.getSource().binlogFilename());
+ } else {
+ LOGGER.debug(
+ "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+ + "The event will be ignored.{}"
+ + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+ event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+ eventHeader.getPosition(), eventHeader.getNextPosition(),
+ offsetContext.getSource().binlogFilename());
+ }
+ } else {
+ LOGGER.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);
+ metrics.onFilteredEvent("source = " + tableId);
+ }
+ }
+
+ /**
+ * Generate source records for the supplied event with an {@link WriteRowsEventData}.
+ *
+ * @param event the database change data event to be processed; may not be null
+ * @throws InterruptedException if this thread is interrupted while blocking
+ */
+ protected void handleInsert(Event event) throws InterruptedException {
+ handleChange(event, "insert", WriteRowsEventData.class, x -> taskContext.getSchema().getTableId(x.getTableId()),
+ WriteRowsEventData::getRows,
+ (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+ new MySqlChangeRecordEmitter(offsetContext, clock, Operation.CREATE, null, row)));
+ }
+
+ /**
+ * Generate source records for the supplied event with an {@link UpdateRowsEventData}.
+ *
+ * @param event the database change data event to be processed; may not be null
+ * @throws InterruptedException if this thread is interrupted while blocking
+ */
+ protected void handleUpdate(Event event) throws InterruptedException {
+ handleChange(event, "update", UpdateRowsEventData.class,
+ x -> taskContext.getSchema().getTableId(x.getTableId()), UpdateRowsEventData::getRows,
+ (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+ new MySqlChangeRecordEmitter(offsetContext, clock, Operation.UPDATE, row.getKey(),
+ row.getValue())));
+ }
+
+ /**
+ * Generate source records for the supplied event with an {@link DeleteRowsEventData}.
+ *
+ * @param event the database change data event to be processed; may not be null
+ * @throws InterruptedException if this thread is interrupted while blocking
+ */
+ protected void handleDelete(Event event) throws InterruptedException {
+ handleChange(event, "delete", DeleteRowsEventData.class,
+ x -> taskContext.getSchema().getTableId(x.getTableId()), DeleteRowsEventData::getRows,
+ (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+ new MySqlChangeRecordEmitter(offsetContext, clock, Operation.DELETE, row, null)));
+ }
+
+ private <T extends EventData, U> void handleChange(Event event, String changeType, Class<T> eventDataClass,
+ TableIdProvider<T> tableIdProvider,
+ RowsProvider<T, U> rowsProvider, BinlogChangeEmitter<U> changeEmitter)
+ throws InterruptedException {
+ if (skipEvent) {
+ // We can skip this because we should already be at least this far ...
+ LOGGER.info("Skipping previously processed row event: {}", event);
+ return;
+ }
+ if (ignoreDmlEventByGtidSource) {
+ LOGGER.debug("Skipping DML event because this GTID source is filtered: {}", event);
+ return;
+ }
+ final T data = unwrapData(event);
+ final TableId tableId = tableIdProvider.getTableId(data);
+ final List<U> rows = rowsProvider.getRows(data);
+
+ if (tableId != null && taskContext.getSchema().schemaFor(tableId) != null) {
+ int count = 0;
+ int numRows = rows.size();
+ if (startingRowNumber < numRows) {
+ for (int row = startingRowNumber; row != numRows; ++row) {
+ offsetContext.setRowNumber(row, numRows);
+ offsetContext.event(tableId, eventTimestamp);
+ changeEmitter.emit(tableId, rows.get(row));
+ count++;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ if (startingRowNumber != 0) {
+ LOGGER.debug("Emitted {} {} record(s) for last {} row(s) in event: {}",
+ count, changeType, numRows - startingRowNumber, event);
+ } else {
+ LOGGER.debug("Emitted {} {} record(s) for event: {}", count, changeType, event);
+ }
+ }
+ offsetContext.changeEventCompleted();
+ } else {
+ // All rows were previously processed ...
+ LOGGER.debug("Skipping previously processed {} event: {}", changeType, event);
+ }
+ } else {
+ informAboutUnknownTableIfRequired(event, tableId, changeType + " row");
+ }
+ startingRowNumber = 0;
+ }
+
+ /**
+ * Handle a {@link EventType#VIEW_CHANGE} event.
+ *
+ * @param event the database change data event to be processed; may not be null
+ * @throws InterruptedException if this thread is interrupted while blocking
+ */
+ protected void viewChange(Event event) throws InterruptedException {
+ LOGGER.debug("View Change event: {}", event);
+ // do nothing
+ }
+
+ /**
+ * Handle a {@link EventType#XA_PREPARE} event.
+ *
+ * @param event the database change data event to be processed; may not be null
+ * @throws InterruptedException if this thread is interrupted while blocking
+ */
+ protected void prepareTransaction(Event event) throws InterruptedException {
+ LOGGER.debug("XA Prepare event: {}", event);
+ // do nothing
+ }
+
+ private SSLMode sslModeFor(SecureConnectionMode mode) {
+ switch (mode) {
+ case DISABLED:
+ return SSLMode.DISABLED;
+ case PREFERRED:
+ return SSLMode.PREFERRED;
+ case REQUIRED:
+ return SSLMode.REQUIRED;
+ case VERIFY_CA:
+ return SSLMode.VERIFY_CA;
+ case VERIFY_IDENTITY:
+ return SSLMode.VERIFY_IDENTITY;
+ }
+ return null;
+ }
+
+ @Override
+ public void execute(ChangeEventSourceContext context) throws InterruptedException {
+ if (!connectorConfig.getSnapshotMode().shouldStream()) {
+ LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode());
+ return;
+ }
+ taskContext.getSchema().assureNonEmptySchema();
+ final Set<Operation> skippedOperations = connectorConfig.getSkippedOps();
+
+ // Register our event handlers ...
+ eventHandlers.put(EventType.STOP, this::handleServerStop);
+ eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
+ eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
+ eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
+ eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
+ eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
+
+ if (!skippedOperations.contains(Operation.CREATE)) {
+ eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
+ eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
+ }
+
+ if (!skippedOperations.contains(Operation.UPDATE)) {
+ eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
+ eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
+ }
+
+ if (!skippedOperations.contains(Operation.DELETE)) {
+ eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
+ eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
+ }
+
+ eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
+ eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
+ eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
+
+ // Conditionally register ROWS_QUERY handler to parse SQL statements.
+ if (connectorConfig.includeSqlQuery()) {
+ eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
+ }
+
+ client.registerEventListener(connectorConfig.bufferSizeForStreamingChangeEventSource() == 0
+ ? this::handleEvent
+ : (new EventBuffer(connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context))::add);
+
+ client.registerLifecycleListener(new ReaderThreadLifecycleListener());
+ client.registerEventListener(this::onEvent);
+ if (LOGGER.isDebugEnabled()) {
+ client.registerEventListener(this::logEvent);
+ }
+
+ final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
+ metrics.setIsGtidModeEnabled(isGtidModeEnabled);
+
+ // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium
+ // checkpoint.
+ String availableServerGtidStr = connection.knownGtidSet();
+ if (isGtidModeEnabled) {
+ // The server is using GTIDs, so enable the handler ...
+ eventHandlers.put(EventType.GTID, this::handleGtidEvent);
+
+ // Now look at the GTID set from the server and what we've previously seen ...
+ GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
+
+ // also take into account purged GTID logs
+ GtidSet purgedServerGtidSet = connection.purgedGtidSet();
+ LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);
+
+ GtidSet filteredGtidSet = filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
+ if (filteredGtidSet != null) {
+ // We've seen at least some GTIDs, so start reading from the filtered GTID set ...
+ LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
+ String filteredGtidSetStr = filteredGtidSet.toString();
+ client.setGtidSet(filteredGtidSetStr);
+ offsetContext.setCompletedGtidSet(filteredGtidSetStr);
+ gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
+ } else {
+ // We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning
+ // ...
+ client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+ client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+ gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
+ }
+ } else {
+ // The server is not using GTIDs, so start reading the binlog based upon where we last left off ...
+ client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+ client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+ }
+
+ // We may be restarting in the middle of a transaction, so see how far into the transaction we have already
+ // processed...
+ initialEventsToSkip = offsetContext.eventsToSkipUponRestart();
+ LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);
+
+ // Set the starting row number, which is the next row number to be read ...
+ startingRowNumber = offsetContext.rowsToSkipUponRestart();
+ LOGGER.info("Skip {} rows on streaming start", startingRowNumber);
+
+ // Only when we reach the first BEGIN event will we start to skip events ...
+ skipEvent = false;
+
+ try {
+ // Start the log reader, which starts background threads ...
+ if (context.isRunning()) {
+ long timeout = connectorConfig.getConnectionTimeout().toMillis();
+ long started = clock.currentTimeInMillis();
+ try {
+ LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
+ client.connect(timeout);
+ // Need to wait for keepalive thread to be running, otherwise it can be left orphaned
+ // The problem is with timing. When the close is called too early after connect then
+ // the keepalive thread is not terminated
+ if (client.isKeepAlive()) {
+ LOGGER.info("Waiting for keepalive thread to start");
+ final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
+ int waitAttempts = 50;
+ boolean keepAliveThreadRunning = false;
+ while (!keepAliveThreadRunning && waitAttempts-- > 0) {
+ for (Thread t : binaryLogClientThreads.values()) {
+ if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
+ LOGGER.info("Keepalive thread is running");
+ keepAliveThreadRunning = true;
+ }
+ }
+ metronome.pause();
+ }
+ }
+ } catch (TimeoutException e) {
+ // If the client thread is interrupted *before* the client could connect, the client throws a
+ // timeout exception
+ // The only way we can distinguish this is if we get the timeout exception before the specified
+ // timeout has
+ // elapsed, so we simply check this (within 10%) ...
+ long duration = clock.currentTimeInMillis() - started;
+ if (duration > (0.9 * timeout)) {
+ double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
+ throw new DebeziumException(
+ "Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
+ connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+ + connectorConfig.username() + "'",
+ e);
+ }
+ // Otherwise, we were told to shutdown, so we don't care about the timeout exception
+ } catch (AuthenticationException e) {
+ throw new DebeziumException("Failed to authenticate to the MySQL database at " +
+ connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+ + connectorConfig.username() + "'", e);
+ } catch (Throwable e) {
+ throw new DebeziumException("Unable to connect to the MySQL database at " +
+ connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+ + connectorConfig.username() + "': " + e.getMessage(), e);
+ }
+ }
+ while (context.isRunning()) {
+ Thread.sleep(100);
+ }
+ } finally {
+ try {
+ client.disconnect();
+ } catch (Exception e) {
+ LOGGER.info("Exception while stopping binary log client", e);
+ }
+ }
+ }
+
+ private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connectorConfig,
+ MySqlConnection connection) {
+ String acceptedTlsVersion = connection.getSessionVariableForSslVersion();
+ if (!isNullOrEmpty(acceptedTlsVersion)) {
+ SSLMode sslMode = sslModeFor(connectorConfig.sslMode());
+
+ // Keystore settings can be passed via system properties too so we need to read them
+ final String password = System.getProperty("javax.net.ssl.keyStorePassword");
+ final String keyFilename = System.getProperty("javax.net.ssl.keyStore");
+ KeyManager[] keyManagers = null;
+ if (keyFilename != null) {
+ final char[] passwordArray = (password == null) ? null : password.toCharArray();
+ try {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(new FileInputStream(keyFilename), passwordArray);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
+ kmf.init(ks, passwordArray);
+
+ keyManagers = kmf.getKeyManagers();
+ } catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException
+ | UnrecoverableKeyException e) {
+ throw new DebeziumException("Could not load keystore", e);
+ }
+ }
+
+ // DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that
+ // the accepted TLS version is passed to the constructed factory
+ if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) {
+ final KeyManager[] finalKMS = keyManagers;
+ return new DefaultSSLSocketFactory(acceptedTlsVersion) {
+
+ @Override
+ protected void initSSLContext(SSLContext sc)
+ throws GeneralSecurityException {
+ sc.init(finalKMS, new TrustManager[]{
+ new X509TrustManager() {
+
+ @Override
+ public void checkClientTrusted(
+ X509Certificate[] x509Certificates,
+ String s)
+ throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(
+ X509Certificate[] x509Certificates,
+ String s)
+ throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }
+ }, null);
+ }
+ };
+ } else {
+ return new DefaultSSLSocketFactory(acceptedTlsVersion);
+ }
+ }
+
+ return null;
+ }
+
+ private void logStreamingSourceState() {
+ logStreamingSourceState(Level.ERROR);
+ }
+
+ protected void logEvent(Event event) {
+ LOGGER.trace("Received event: {}", event);
+ }
+
+ private void logStreamingSourceState(Level severity) {
+ final Object position = client == null ? "N/A" : client.getBinlogFilename() + "/" + client.getBinlogPosition();
+ final String message =
+ "Error during binlog processing. Last offset stored = {}, binlog reader near position = {}";
+ switch (severity) {
+ case WARN:
+ LOGGER.warn(message, lastOffset, position);
+ break;
+ case DEBUG:
+ LOGGER.debug(message, lastOffset, position);
+ break;
+ default:
+ LOGGER.error(message, lastOffset, position);
+ }
+ }
+
+ /**
+ * Apply the include/exclude GTID source filters to the current {@link #source() GTID set} and merge them onto the
+ * currently available GTID set from a MySQL server.
+ *
+ * The merging behavior of this method might seem a bit strange at first. It's required in order for Debezium to consume a
+ * MySQL binlog that has multi-source replication enabled, if a failover has to occur. In such a case, the server that
+ * Debezium is failed over to might have a different set of sources, but still include the sources required for Debezium
+ * to continue to function. MySQL does not allow downstream replicas to connect if the GTID set does not contain GTIDs for
+ * all channels that the server is replicating from, even if the server does have the data needed by the client. To get
+ * around this, we can have Debezium merge its GTID set with whatever is on the server, so that MySQL will allow it to
+ * connect. See <a href="https://issues.jboss.org/browse/DBZ-143">DBZ-143</a> for details.
+ *
+ * This method does not mutate any state in the context.
+ *
+ * @param availableServerGtidSet the GTID set currently available in the MySQL server
+ * @param purgedServerGtid the GTID set already purged by the MySQL server
+ * @return A GTID set meant for consuming from a MySQL binlog; may return null if the SourceInfo has no GTIDs and therefore
+ * none were filtered
+ */
+ public GtidSet filterGtidSet(GtidSet availableServerGtidSet, GtidSet purgedServerGtid) {
+ String gtidStr = offsetContext.gtidSet();
+ if (gtidStr == null) {
+ return null;
+ }
+ LOGGER.info("Attempting to generate a filtered GTID set");
+ LOGGER.info("GTID set from previous recorded offset: {}", gtidStr);
+ GtidSet filteredGtidSet = new GtidSet(gtidStr);
+ Predicate<String> gtidSourceFilter = connectorConfig.gtidSourceFilter();
+ if (gtidSourceFilter != null) {
+ filteredGtidSet = filteredGtidSet.retainAll(gtidSourceFilter);
+ LOGGER.info("GTID set after applying GTID source includes/excludes to previous recorded offset: {}",
+ filteredGtidSet);
+ }
+ LOGGER.info("GTID set available on server: {}", availableServerGtidSet);
+
+ GtidSet mergedGtidSet;
+
+ if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
+ final GtidSet knownGtidSet = filteredGtidSet;
+ LOGGER.info("Using first available positions for new GTID channels");
+ final GtidSet relevantAvailableServerGtidSet =
+ (gtidSourceFilter != null) ? availableServerGtidSet.retainAll(gtidSourceFilter)
+ : availableServerGtidSet;
+ LOGGER.info("Relevant GTID set available on server: {}", relevantAvailableServerGtidSet);
+
+ // Since the GTID recorded in the checkpoint represents the CDC-executed records, in
+ // certain scenarios
+ // (such as when the startup mode is earliest/timestamp/binlogfile), the recorded GTID
+ // may not start from
+ // the beginning. For example, A:300-500. However, during job recovery, we usually only
+ // need to focus on
+ // the last consumed point instead of consuming A:1-299. Therefore, some adjustments
+ // need to be made to the
+ // recorded offset in the checkpoint, and the available GTID for other MySQL instances
+ // should be completed.
+ mergedGtidSet = GtidUtils.fixRestoredGtidSet(
+ GtidUtils.mergeGtidSetInto(
+ relevantAvailableServerGtidSet.retainAll(
+ uuid -> knownGtidSet.forServerWithId(uuid) != null),
+ purgedServerGtid),
+ filteredGtidSet);
+ } else {
+ mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
+ }
+
+ LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
+ return mergedGtidSet;
+ }
+
+ MySqlStreamingChangeEventSourceMetrics getMetrics() {
+ return metrics;
+ }
+
+ void rewindBinaryLogClient(ChangeEventSourceContext context, BinlogPosition position) {
+ try {
+ if (context.isRunning()) {
+ LOGGER.debug("Rewinding binlog to position {}", position);
+ client.disconnect();
+ client.setBinlogFilename(position.getFilename());
+ client.setBinlogPosition(position.getPosition());
+ client.connect();
+ }
+ } catch (IOException e) {
+ LOGGER.error("Unexpected error when re-connecting to the MySQL binary log reader", e);
+ }
+ }
+
+ BinlogPosition getCurrentBinlogPosition() {
+ return new BinlogPosition(client.getBinlogFilename(), client.getBinlogPosition());
+ }
+
+ /**
+ * Wraps the specified exception in a {@link DebeziumException}, ensuring that all useful state is captured inside
+ * the new exception's message.
+ *
+ * @param error the exception; may not be null
+ * @return the wrapped Kafka Connect exception
+ */
+ protected DebeziumException wrap(Throwable error) {
+ assert error != null;
+ String msg = error.getMessage();
+ if (error instanceof ServerException) {
+ ServerException e = (ServerException) error;
+ msg = msg + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSqlState() + ".";
+ } else if (error instanceof SQLException) {
+ SQLException e = (SQLException) error;
+ msg = e.getMessage() + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSQLState() + ".";
+ }
+ return new DebeziumException(msg, error);
+ }
+
+ protected final class ReaderThreadLifecycleListener implements LifecycleListener {
+
+ @Override
+ public void onDisconnect(BinaryLogClient client) {
+ if (LOGGER.isInfoEnabled()) {
+ taskContext.temporaryLoggingContext(connectorConfig, "binlog", () -> {
+ Map<String, ?> offset = lastOffset;
+ if (offset != null) {
+ LOGGER.info("Stopped reading binlog after {} events, last recorded offset: {}",
+ totalRecordCounter, offset);
+ } else {
+ LOGGER.info("Stopped reading binlog after {} events, no new offset was recorded",
+ totalRecordCounter);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void onConnect(BinaryLogClient client) {
+ // Set up the MDC logging context for this thread ...
+ taskContext.configureLoggingContext("binlog");
+
+ // The event row number will be used when processing the first event ...
+ LOGGER.info("Connected to MySQL binlog at {}:{}, starting at {}", connectorConfig.hostname(),
+ connectorConfig.port(), offsetContext);
+ }
+
+ @Override
+ public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
+ LOGGER.debug("A communication failure event arrived", ex);
+ logStreamingSourceState();
+ try {
+ // Stop BinaryLogClient background threads
+ client.disconnect();
+ } catch (final Exception e) {
+ LOGGER.debug("Exception while closing client", e);
+ }
+ errorHandler.setProducerThrowable(wrap(ex));
+ }
+
+ @Override
+ public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
+ if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+ LOGGER.debug("A deserialization failure event arrived", ex);
+ logStreamingSourceState();
+ errorHandler.setProducerThrowable(wrap(ex));
+ } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+ LOGGER.warn("A deserialization failure event arrived", ex);
+ logStreamingSourceState(Level.WARN);
+ } else {
+ LOGGER.debug("A deserialization failure event arrived", ex);
+ logStreamingSourceState(Level.DEBUG);
+ }
+ }
+ }
+
+ protected void setRestoredFromCheckpoint() {
+ this.isRestoredFromCheckpoint = true;
+ }
+
+ @FunctionalInterface
+ private interface TableIdProvider<E extends EventData> {
+
+ TableId getTableId(E data);
+ }
+
+ @FunctionalInterface
+ private interface RowsProvider<E extends EventData, U> {
+
+ List<U> getRows(E data);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
index 7082845a56..e07bc3d656 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/context/StatefulTaskContext.java
@@ -28,6 +28,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.GtidSet;
+import io.debezium.connector.mysql.GtidUtils;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
@@ -219,10 +220,20 @@ public class StatefulTaskContext {
"Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
return false;
}
- // GTIDs are enabled
- GtidSet gtidSet = new GtidSet(gtidStr);
// Get the GTID set that is available in the server ...
GtidSet availableGtidSet = new GtidSet(availableGtidStr);
+
+ // GTIDs are enabled
+ LOG.info("Merging server GTID set {} with restored GTID set {}", availableGtidSet, gtidStr);
+
+ // Based on the current server's GTID, the GTID in MySqlOffsetContext is adjusted to ensure
+ // the completeness of
+ // the GTID. This is done to address the issue of being unable to recover from a checkpoint
+ // in certain startup
+ // modes.
+ GtidSet gtidSet = GtidUtils.fixRestoredGtidSet(availableGtidSet, new GtidSet(gtidStr));
+ LOG.info("Merged GTID set is {}", gtidSet);
+
if (gtidSet.isContainedWithin(availableGtidSet)) {
LOG.info(
"MySQL current GTID set {} does contain the GTID set {} required by the connector.",
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 5328121085..4ad493afe9 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -438,6 +438,8 @@
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/FinishedSnapshotSplitInfo.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeFetcher.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/internal/DebeziumChangeConsumer.java
+ inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
+ inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java