You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "e-mhui (via GitHub)" <gi...@apache.org> on 2023/06/25 07:56:48 UTC

[GitHub] [inlong] e-mhui commented on a diff in pull request #8308: [INLONG-8307][Sort] Fix job restart failed from savepoint When set 'scan.startup.mode' = 'timestamp|earliest-offset|specific-offset'

e-mhui commented on code in PR #8308:
URL: https://github.com/apache/inlong/pull/8308#discussion_r1241081084


##########
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##########
@@ -0,0 +1,1262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import com.github.shyiko.mysql.binlog.network.AuthenticationException;
+import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.ServerException;
+import io.debezium.DebeziumException;
+import io.debezium.annotation.SingleThreadAccess;
+import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
+import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
+import io.debezium.data.Envelope.Operation;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
+import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import static io.debezium.util.Strings.isNullOrEmpty;
+
+/**
+ * Copied from Debezium project to fix
+ * https://github.com/apache/inlong/issues/8307.
+ *
+ * <p>Line 1134-1139 : Adjust GTID merging logic to support recovering from job which previously
+ * specifying starting offset on start.
+ */
+public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class);
+
+    private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
+
+    private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);
+    private final BinaryLogClient client;
+    private final MySqlStreamingChangeEventSourceMetrics metrics;
+    private final Clock clock;
+    private final EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode;
+    private final EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode;
+
+    private int startingRowNumber = 0;
+    private long initialEventsToSkip = 0L;
+    private boolean skipEvent = false;
+    private boolean ignoreDmlEventByGtidSource = false;
+    private final Predicate<String> gtidDmlSourceFilter;
+    private final AtomicLong totalRecordCounter = new AtomicLong();
+    private volatile Map<String, ?> lastOffset = null;
+    private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
+    private final float heartbeatIntervalFactor = 0.8f;
+    private final Map<String, Thread> binaryLogClientThreads = new ConcurrentHashMap<>(4);
+    private final MySqlTaskContext taskContext;
+    private final MySqlConnectorConfig connectorConfig;
+    private final MySqlConnection connection;
+    private final EventDispatcher<TableId> eventDispatcher;
+    private final MySqlOffsetContext offsetContext;
+    private final ErrorHandler errorHandler;
+    private boolean isRestoredFromCheckpoint = false;
+
+    @SingleThreadAccess("binlog client thread")
+    private Instant eventTimestamp;
+
+    public static class BinlogPosition {
+
+        final String filename;
+        final long position;
+
+        public BinlogPosition(String filename, long position) {
+            assert filename != null;
+
+            this.filename = filename;
+            this.position = position;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public long getPosition() {
+            return position;
+        }
+
+        @Override
+        public String toString() {
+            return filename + "/" + position;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + filename.hashCode();
+            result = prime * result + (int) (position ^ (position >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            BinlogPosition other = (BinlogPosition) obj;
+            if (!filename.equals(other.filename)) {
+                return false;
+            }
+            if (position != other.position) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    @FunctionalInterface
+    private static interface BinlogChangeEmitter<T> {
+
+        void emit(TableId tableId, T data) throws InterruptedException;
+    }
+
+    public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlOffsetContext offsetContext,
+            MySqlConnection connection,
+            EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
+            MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics) {
+
+        this.taskContext = taskContext;
+        this.connectorConfig = connectorConfig;
+        this.connection = connection;
+        this.clock = clock;
+        this.eventDispatcher = dispatcher;
+        this.errorHandler = errorHandler;
+        // With snapshot mode NEVER the initial context is not created by snapshot
+        this.offsetContext = (offsetContext == null) ? MySqlOffsetContext.initial(connectorConfig) : offsetContext;
+        this.metrics = metrics;
+
+        eventDeserializationFailureHandlingMode = connectorConfig.getEventProcessingFailureHandlingMode();
+        inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();
+
+        // Set up the log reader ...
+        client = taskContext.getBinaryLogClient();
+        // BinaryLogClient will overwrite thread names later
+        client.setThreadFactory(
+                Threads.threadFactory(MySqlConnector.class, connectorConfig.getLogicalName(), "binlog-client", false,
+                        false,
+                        x -> binaryLogClientThreads.put(x.getName(), x)));
+        client.setServerId(connectorConfig.serverId());
+        client.setSSLMode(sslModeFor(connectorConfig.sslMode()));
+        if (connectorConfig.sslModeEnabled()) {
+            SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectorConfig, connection);
+            if (sslSocketFactory != null) {
+                client.setSslSocketFactory(sslSocketFactory);
+            }
+        }
+        Configuration configuration = connectorConfig.getConfig();
+        client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
+        final long keepAliveInterval = configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
+        client.setKeepAliveInterval(keepAliveInterval);
+        // Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor
+        // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default value of
+        // heartbeatIntervalFactor
+        // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from
+        // the MySQL server.
+        client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));
+
+        boolean filterDmlEventsByGtidSource =
+                configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
+        gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
+
+        // Set up the event deserializer with additional type(s) ...
+        final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
+        EventDeserializer eventDeserializer = new EventDeserializer() {
+
+            @Override
+            public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
+                try {
+                    // Delegate to the superclass ...
+                    Event event = super.nextEvent(inputStream);
+
+                    // We have to record the most recent TableMapEventData for each table number for our custom
+                    // deserializers ...
+                    if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
+                        TableMapEventData tableMapEvent = event.getData();
+                        tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
+                    }
+                    return event;
+                }
+                // DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
+                catch (EventDataDeserializationException edde) {
+                    // DBZ-3095 As of Java 15, when reaching EOF in the binlog stream, the polling loop in
+                    // BinaryLogClient#listenForEventPackets() keeps returning values != -1 from peek();
+                    // this causes the loop to never finish
+                    // Propagating the exception (either EOF or socket closed) causes the loop to be aborted
+                    // in this case
+                    if (edde.getCause() instanceof IOException) {
+                        throw edde;
+                    }
+
+                    EventHeaderV4 header = new EventHeaderV4();
+                    header.setEventType(EventType.INCIDENT);
+                    header.setTimestamp(edde.getEventHeader().getTimestamp());
+                    header.setServerId(edde.getEventHeader().getServerId());
+
+                    if (edde.getEventHeader() instanceof EventHeaderV4) {
+                        header.setEventLength(((EventHeaderV4) edde.getEventHeader()).getEventLength());
+                        header.setNextPosition(((EventHeaderV4) edde.getEventHeader()).getNextPosition());
+                        header.setFlags(((EventHeaderV4) edde.getEventHeader()).getFlags());
+                    }
+
+                    EventData data = new EventDataDeserializationExceptionData(edde);
+                    return new Event(header, data);
+                }
+            }
+        };
+
+        // Add our custom deserializers ...
+        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
+        eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
+        eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
+                new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
+                new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
+                new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
+                new RowDeserializers.WriteRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
+                new RowDeserializers.UpdateRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
+                new RowDeserializers.DeleteRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        client.setEventDeserializer(eventDeserializer);
+    }
+
+    protected void onEvent(Event event) {
+        long ts = 0;
+
+        if (event.getHeader().getEventType() == EventType.HEARTBEAT) {
+            // HEARTBEAT events have no timestamp but are fired only when
+            // there is no traffic on the connection which means we are caught-up
+            // https://dev.mysql.com/doc/internals/en/heartbeat-event.html
+            metrics.setMilliSecondsBehindSource(ts);
+            return;
+        }
+
+        // MySQL has seconds resolution but mysql-binlog-connector-java returns
+        // a value in milliseconds
+        long eventTs = event.getHeader().getTimestamp();
+
+        if (eventTs == 0) {
+            LOGGER.trace("Received unexpected event with 0 timestamp: {}", event);
+            return;
+        }
+
+        ts = clock.currentTimeInMillis() - eventTs;
+        LOGGER.trace("Current milliseconds behind source: {} ms", ts);
+        metrics.setMilliSecondsBehindSource(ts);
+    }
+
+    protected void ignoreEvent(Event event) {
+        LOGGER.trace("Ignoring event due to missing handler: {}", event);
+    }
+
+    protected void handleEvent(Event event) {
+        if (event == null) {
+            return;
+        }
+
+        final EventHeader eventHeader = event.getHeader();
+        // Update the source offset info. Note that the client returns the value in *milliseconds*, even though the
+        // binlog
+        // contains only *seconds* precision ...
+        // HEARTBEAT events have no timestamp; only set the timestamp if the event is not a HEARTBEAT
+        eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT)
+                ? Instant.ofEpochMilli(eventHeader.getTimestamp())
+                : null;
+        offsetContext.setBinlogThread(eventHeader.getServerId());
+
+        final EventType eventType = eventHeader.getEventType();
+        if (eventType == EventType.ROTATE) {
+            EventData eventData = event.getData();
+            RotateEventData rotateEventData;
+            if (eventData instanceof EventDeserializer.EventDataWrapper) {
+                rotateEventData = (RotateEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+            } else {
+                rotateEventData = (RotateEventData) eventData;
+            }
+            offsetContext.setBinlogStartPoint(rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition());
+        } else if (eventHeader instanceof EventHeaderV4) {
+            EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
+            offsetContext.setEventPosition(trackableEventHeader.getPosition(), trackableEventHeader.getEventLength());
+        }
+
+        // If there is a handler for this event, forward the event to it ...
+        try {
+            // Forward the event to the handler ...
+            eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event);
+
+            // Generate heartbeat message if the time is right
+            eventDispatcher.dispatchHeartbeatEvent(offsetContext);
+
+            // Capture that we've completed another event ...
+            offsetContext.completeEvent();
+
+            if (skipEvent) {
+                // We're in the mode of skipping events and we just skipped this one, so decrement our skip count ...
+                --initialEventsToSkip;
+                skipEvent = initialEventsToSkip > 0;
+            }
+        } catch (RuntimeException e) {
+            // There was an error in the event handler, so propagate the failure to Kafka Connect ...
+            logStreamingSourceState();
+            errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog event", e));
+            // Do not stop the client, since Kafka Connect should stop the connector on it's own
+            // (and doing it here may cause problems the second time it is stopped).
+            // We can clear the listeners though so that we ignore all future events ...
+            eventHandlers.clear();
+            LOGGER.info(
+                    "Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.");
+        } catch (InterruptedException e) {
+            // Most likely because this reader was stopped and our thread was interrupted ...
+            Thread.currentThread().interrupt();
+            eventHandlers.clear();
+            LOGGER.info("Stopped processing binlog events due to thread interruption");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends EventData> T unwrapData(Event event) {
+        EventData eventData = event.getData();
+        if (eventData instanceof EventDeserializer.EventDataWrapper) {
+            eventData = ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+        }
+        return (T) eventData;
+    }
+
+    /**
+     * Handle the supplied event that signals that mysqld has stopped.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerStop(Event event) {
+        LOGGER.debug("Server stopped: {}", event);
+    }
+
+    /**
+     * Handle the supplied event that is sent by a primary to a replica to let the replica know that the primary is still alive. Not
+     * written to a binary log.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerHeartbeat(Event event) {
+        LOGGER.trace("Server heartbeat: {}", event);
+    }
+
+    /**
+     * Handle the supplied event that signals that an out of the ordinary event that occurred on the master. It notifies the replica
+     * that something happened on the primary that might cause data to be in an inconsistent state.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerIncident(Event event) {
+        if (event.getData() instanceof EventDataDeserializationExceptionData) {
+            metrics.onErroneousEvent("source = " + event.toString());
+            EventDataDeserializationExceptionData data = event.getData();
+
+            EventHeaderV4 eventHeader = (EventHeaderV4) data.getCause().getEventHeader(); // safe cast, instantiated
+                                                                                          // that ourselves
+
+            // logging some additional context but not the exception itself, this will happen in handleEvent()
+            if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                LOGGER.error(
+                        "Error while deserializing binlog event at offset {}.{}" +
+                                "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        offsetContext.getOffset(),
+                        System.lineSeparator(),
+                        eventHeader.getPosition(),
+                        eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+
+                throw new RuntimeException(data.getCause());
+            } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn(
+                        "Error while deserializing binlog event at offset {}.{}" +
+                                "This exception will be ignored and the event be skipped.{}" +
+                                "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        offsetContext.getOffset(),
+                        System.lineSeparator(),
+                        System.lineSeparator(),
+                        eventHeader.getPosition(),
+                        eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename(),
+                        data.getCause());
+            }
+        } else {
+            LOGGER.error("Server incident: {}", event);
+        }
+    }
+
+    /**
+     * Handle the supplied event with a {@link RotateEventData} that signals the logs are being rotated. This means that either
+     * the server was restarted, or the binlog has transitioned to a new file. In either case, subsequent table numbers will be
+     * different than those seen to this point.
+     *
+     * @param event the database change data event to be processed; may not be null
+     */
+    protected void handleRotateLogsEvent(Event event) {
+        LOGGER.debug("Rotating logs: {}", event);
+        RotateEventData command = unwrapData(event);
+        assert command != null;
+        taskContext.getSchema().clearTableMappings();
+    }
+
+    /**
+     * Handle the supplied event with a {@link GtidEventData} that signals the beginning of a GTID transaction.
+     * We don't yet know whether this transaction contains any events we're interested in, but we have to record
+     * it so that we know the position of this event and know we've processed the binlog to this point.
+     * <p>
+     * Note that this captures the current GTID and complete GTID set, regardless of whether the connector is
+     * {@link MySqlTaskContext#gtidSourceFilter() filtering} the GTID set upon connection. We do this because
+     * we actually want to capture all GTID set values found in the binlog, whether or not we process them.
+     * However, only when we connect do we actually want to pass to MySQL only those GTID ranges that are applicable
+     * per the configuration.
+     *
+     * @param event the GTID event to be processed; may not be null
+     */
+    protected void handleGtidEvent(Event event) {
+        LOGGER.debug("GTID transaction: {}", event);
+        GtidEventData gtidEvent = unwrapData(event);
+        String gtid = gtidEvent.getGtid();
+        gtidSet.add(gtid);
+        offsetContext.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set
+        ignoreDmlEventByGtidSource = false;
+        if (gtidDmlSourceFilter != null && gtid != null) {
+            String uuid = gtid.trim().substring(0, gtid.indexOf(":"));
+            if (!gtidDmlSourceFilter.test(uuid)) {
+                ignoreDmlEventByGtidSource = true;
+            }
+        }
+        metrics.onGtidChange(gtid);
+    }
+
+    /**
+     * Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL query
+     * that generated the event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     */
+    protected void handleRowsQuery(Event event) {
+        // Unwrap the RowsQueryEvent
+        final RowsQueryEventData lastRowsQueryEventData = unwrapData(event);
+
+        // Set the query on the source
+        offsetContext.setQuery(lastRowsQueryEventData.getQuery());
+    }
+
+    /**
+     * Handle the supplied event with an {@link QueryEventData} by possibly recording the DDL statements as changes in the
+     * MySQL schemas.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while recording the DDL statements
+     */
+    protected void handleQueryEvent(Event event) throws InterruptedException {
+        QueryEventData command = unwrapData(event);
+        LOGGER.debug("Received query command: {}", event);
+        String sql = command.getSql().trim();
+        if (sql.equalsIgnoreCase("BEGIN")) {
+            // We are starting a new transaction ...
+            offsetContext.startNextTransaction();
+            eventDispatcher.dispatchTransactionStartedEvent(offsetContext.getTransactionId(), offsetContext);
+            offsetContext.setBinlogThread(command.getThreadId());
+            if (initialEventsToSkip != 0) {
+                LOGGER.debug(
+                        "Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
+                        initialEventsToSkip, startingRowNumber);
+                // We are restarting, so we need to skip the events in this transaction that we processed previously...
+                skipEvent = true;
+            }
+            return;
+        }
+        if (sql.equalsIgnoreCase("COMMIT")) {
+            handleTransactionCompletion(event);
+            return;
+        }
+
+        String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
+
+        if (upperCasedStatementBegin.startsWith("XA ")) {
+            // This is an XA transaction, and we currently ignore these and do nothing ...
+            return;
+        }
+        if (connectorConfig.getDdlFilter().test(sql)) {
+            LOGGER.debug("DDL '{}' was filtered out of processing", sql);
+            return;
+        }
+        if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ")
+                || upperCasedStatementBegin.equals("DELETE ")) {
+            if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                throw new DebeziumException(
+                        "Received DML '" + sql
+                                + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+            } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn("Warning only: Received DML '" + sql
+                        + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+                return;
+            } else {
+                return;
+            }
+        }
+        if (sql.equalsIgnoreCase("ROLLBACK")) {
+            // We have hit a ROLLBACK which is not supported
+            LOGGER.warn(
+                    "Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering",
+                    MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
+        }
+
+        final List<SchemaChangeEvent> schemaChangeEvents =
+                taskContext.getSchema().parseStreamingDdl(sql, command.getDatabase(), offsetContext,
+                        clock.currentTimeAsInstant());
+        try {
+            for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
+                if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
+                    continue;
+                }
+
+                final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null
+                        : schemaChangeEvent.getTables().iterator().next().id();
+                eventDispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> {
+                    try {
+                        receiver.schemaChangeEvent(schemaChangeEvent);
+                    } catch (Exception e) {
+                        throw new DebeziumException(e);
+                    }
+                });
+            }
+        } catch (InterruptedException e) {
+            LOGGER.info("Processing interrupted");
+        }
+    }
+
+    private void handleTransactionCompletion(Event event) throws InterruptedException {
+        // We are completing the transaction ...
+        eventDispatcher.dispatchTransactionCommittedEvent(offsetContext);
+        offsetContext.commitTransaction();
+        offsetContext.setBinlogThread(-1L);
+        skipEvent = false;
+        ignoreDmlEventByGtidSource = false;
+    }
+
+    /**
+     * Handle a change in the table metadata.
+     * <p>
+     * This method should be called whenever we consume a TABLE_MAP event, and every transaction in the log should include one
+     * of these for each table affected by the transaction. Each table map event includes a monotonically-increasing numeric
+     * identifier, and this identifier is used within subsequent events within the same transaction. This table identifier can
+     * change when:
+     * <ol>
+     * <li>the table structure is modified (e.g., via an {@code ALTER TABLE ...} command); or</li>
+     * <li>MySQL rotates to a new binary log file, even if the table structure does not change.</li>
+     * </ol>
+     *
+     * @param event the update event; never null
+     */
+    protected void handleUpdateTableMetadata(Event event) {
+        TableMapEventData metadata = unwrapData(event);
+        long tableNumber = metadata.getTableId();
+        String databaseName = metadata.getDatabase();
+        String tableName = metadata.getTable();
+        TableId tableId = new TableId(databaseName, null, tableName);
+        if (taskContext.getSchema().assignTableNumber(tableNumber, tableId)) {
+            LOGGER.debug("Received update table metadata event: {}", event);
+        } else {
+            informAboutUnknownTableIfRequired(event, tableId, "update table metadata");
+        }
+    }
+
+    /**
+     * If we receive an event for a table that is monitored but whose metadata we
+     * don't know, either ignore that event or raise a warning or error as per the
+     * {@link MySqlConnectorConfig#INCONSISTENT_SCHEMA_HANDLING_MODE} configuration.
+     */
+    private void informAboutUnknownTableIfRequired(Event event, TableId tableId, String typeToLog) {
+        if (tableId != null && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
+            metrics.onErroneousEvent("source = " + tableId + ", event " + event);
+            EventHeaderV4 eventHeader = event.getHeader();
+
+            if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                LOGGER.error(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), eventHeader.getPosition(),
+                        eventHeader.getNextPosition(), offsetContext.getSource().binlogFilename());
+                throw new DebeziumException("Encountered change event for table " + tableId
+                        + " whose schema isn't known to this connector");
+            } else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "The event will be ignored.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+                        eventHeader.getPosition(), eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+            } else {
+                LOGGER.debug(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "The event will be ignored.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+                        eventHeader.getPosition(), eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+            }
+        } else {
+            LOGGER.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);
+            metrics.onFilteredEvent("source = " + tableId);
+        }
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link WriteRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleInsert(Event event) throws InterruptedException {
+        handleChange(event, "insert", WriteRowsEventData.class, x -> taskContext.getSchema().getTableId(x.getTableId()),
+                WriteRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.CREATE, null, row)));
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link UpdateRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleUpdate(Event event) throws InterruptedException {
+        handleChange(event, "update", UpdateRowsEventData.class,
+                x -> taskContext.getSchema().getTableId(x.getTableId()), UpdateRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.UPDATE, row.getKey(),
+                                row.getValue())));
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link DeleteRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleDelete(Event event) throws InterruptedException {
+        handleChange(event, "delete", DeleteRowsEventData.class,
+                x -> taskContext.getSchema().getTableId(x.getTableId()), DeleteRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.DELETE, row, null)));
+    }
+
+    private <T extends EventData, U> void handleChange(Event event, String changeType, Class<T> eventDataClass,
+            TableIdProvider<T> tableIdProvider,
+            RowsProvider<T, U> rowsProvider, BinlogChangeEmitter<U> changeEmitter)
+            throws InterruptedException {
+        if (skipEvent) {
+            // We can skip this because we should already be at least this far ...
+            LOGGER.info("Skipping previously processed row event: {}", event);
+            return;
+        }
+        if (ignoreDmlEventByGtidSource) {
+            LOGGER.debug("Skipping DML event because this GTID source is filtered: {}", event);
+            return;
+        }
+        final T data = unwrapData(event);
+        final TableId tableId = tableIdProvider.getTableId(data);
+        final List<U> rows = rowsProvider.getRows(data);
+
+        if (tableId != null && taskContext.getSchema().schemaFor(tableId) != null) {
+            int count = 0;
+            int numRows = rows.size();
+            if (startingRowNumber < numRows) {
+                for (int row = startingRowNumber; row != numRows; ++row) {
+                    offsetContext.setRowNumber(row, numRows);
+                    offsetContext.event(tableId, eventTimestamp);
+                    changeEmitter.emit(tableId, rows.get(row));
+                    count++;
+                }
+                if (LOGGER.isDebugEnabled()) {
+                    if (startingRowNumber != 0) {
+                        LOGGER.debug("Emitted {} {} record(s) for last {} row(s) in event: {}",
+                                count, changeType, numRows - startingRowNumber, event);
+                    } else {
+                        LOGGER.debug("Emitted {} {} record(s) for event: {}", count, changeType, event);
+                    }
+                }
+                offsetContext.changeEventCompleted();
+            } else {
+                // All rows were previously processed ...
+                LOGGER.debug("Skipping previously processed {} event: {}", changeType, event);
+            }
+        } else {
+            informAboutUnknownTableIfRequired(event, tableId, changeType + " row");
+        }
+        startingRowNumber = 0;
+    }
+
+    /**
+     * Handle a {@link EventType#VIEW_CHANGE} event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void viewChange(Event event) throws InterruptedException {
+        LOGGER.debug("View Change event: {}", event);
+        // do nothing
+    }
+
+    /**
+     * Handle a {@link EventType#XA_PREPARE} event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void prepareTransaction(Event event) throws InterruptedException {
+        LOGGER.debug("XA Prepare event: {}", event);
+        // do nothing
+    }
+
+    private SSLMode sslModeFor(SecureConnectionMode mode) {
+        switch (mode) {
+            case DISABLED:
+                return SSLMode.DISABLED;
+            case PREFERRED:
+                return SSLMode.PREFERRED;
+            case REQUIRED:
+                return SSLMode.REQUIRED;
+            case VERIFY_CA:
+                return SSLMode.VERIFY_CA;
+            case VERIFY_IDENTITY:
+                return SSLMode.VERIFY_IDENTITY;
+        }
+        return null;
+    }
+
+    @Override
+    public void execute(ChangeEventSourceContext context) throws InterruptedException {
+        if (!connectorConfig.getSnapshotMode().shouldStream()) {
+            LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode());
+            return;
+        }
+        taskContext.getSchema().assureNonEmptySchema();
+        final Set<Operation> skippedOperations = connectorConfig.getSkippedOps();
+
+        // Register our event handlers ...
+        eventHandlers.put(EventType.STOP, this::handleServerStop);
+        eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
+        eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
+        eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
+        eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
+        eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
+
+        if (!skippedOperations.contains(Operation.CREATE)) {
+            eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
+            eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
+        }
+
+        if (!skippedOperations.contains(Operation.UPDATE)) {
+            eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
+            eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
+        }
+
+        if (!skippedOperations.contains(Operation.DELETE)) {
+            eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
+            eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
+        }
+
+        eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
+        eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
+        eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
+
+        // Conditionally register ROWS_QUERY handler to parse SQL statements.
+        if (connectorConfig.includeSqlQuery()) {
+            eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
+        }
+
+        client.registerEventListener(connectorConfig.bufferSizeForStreamingChangeEventSource() == 0
+                ? this::handleEvent
+                : (new EventBuffer(connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context))::add);
+
+        client.registerLifecycleListener(new ReaderThreadLifecycleListener());
+        client.registerEventListener(this::onEvent);
+        if (LOGGER.isDebugEnabled()) {
+            client.registerEventListener(this::logEvent);
+        }
+
+        final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
+        metrics.setIsGtidModeEnabled(isGtidModeEnabled);
+
+        // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium
+        // checkpoint.
+        String availableServerGtidStr = connection.knownGtidSet();
+        if (isGtidModeEnabled) {
+            // The server is using GTIDs, so enable the handler ...
+            eventHandlers.put(EventType.GTID, this::handleGtidEvent);
+
+            // Now look at the GTID set from the server and what we've previously seen ...
+            GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
+
+            // also take into account purged GTID logs
+            GtidSet purgedServerGtidSet = connection.purgedGtidSet();
+            LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);
+
+            GtidSet filteredGtidSet = filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
+            if (filteredGtidSet != null) {
+                // We've seen at least some GTIDs, so start reading from the filtered GTID set ...
+                LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
+                String filteredGtidSetStr = filteredGtidSet.toString();
+                client.setGtidSet(filteredGtidSetStr);
+                offsetContext.setCompletedGtidSet(filteredGtidSetStr);
+                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
+            } else {
+                // We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning
+                // ...
+                client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+                client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
+            }
+        } else {
+            // The server is not using GTIDs, so start reading the binlog based upon where we last left off ...
+            client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+            client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+        }
+
+        // We may be restarting in the middle of a transaction, so see how far into the transaction we have already
+        // processed...
+        initialEventsToSkip = offsetContext.eventsToSkipUponRestart();
+        LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);
+
+        // Set the starting row number, which is the next row number to be read ...
+        startingRowNumber = offsetContext.rowsToSkipUponRestart();
+        LOGGER.info("Skip {} rows on streaming start", startingRowNumber);
+
+        // Only when we reach the first BEGIN event will we start to skip events ...
+        skipEvent = false;
+
+        try {
+            // Start the log reader, which starts background threads ...
+            if (context.isRunning()) {
+                long timeout = connectorConfig.getConnectionTimeout().toMillis();
+                long started = clock.currentTimeInMillis();
+                try {
+                    LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
+                    client.connect(timeout);
+                    // Need to wait for keepalive thread to be running, otherwise it can be left orphaned
+                    // The problem is with timing. When the close is called too early after connect then
+                    // the keepalive thread is not terminated
+                    if (client.isKeepAlive()) {
+                        LOGGER.info("Waiting for keepalive thread to start");
+                        final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
+                        int waitAttempts = 50;
+                        boolean keepAliveThreadRunning = false;
+                        while (!keepAliveThreadRunning && waitAttempts-- > 0) {
+                            for (Thread t : binaryLogClientThreads.values()) {
+                                if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
+                                    LOGGER.info("Keepalive thread is running");
+                                    keepAliveThreadRunning = true;
+                                }
+                            }
+                            metronome.pause();
+                        }
+                    }
+                } catch (TimeoutException e) {
+                    // If the client thread is interrupted *before* the client could connect, the client throws a
+                    // timeout exception
+                    // The only way we can distinguish this is if we get the timeout exception before the specified
+                    // timeout has
+                    // elapsed, so we simply check this (within 10%) ...
+                    long duration = clock.currentTimeInMillis() - started;
+                    if (duration > (0.9 * timeout)) {
+                        double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
+                        throw new DebeziumException(
+                                "Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
+                                        connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                                        + connectorConfig.username() + "'",
+                                e);
+                    }
+                    // Otherwise, we were told to shutdown, so we don't care about the timeout exception
+                } catch (AuthenticationException e) {
+                    throw new DebeziumException("Failed to authenticate to the MySQL database at " +
+                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                            + connectorConfig.username() + "'", e);
+                } catch (Throwable e) {
+                    throw new DebeziumException("Unable to connect to the MySQL database at " +
+                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                            + connectorConfig.username() + "': " + e.getMessage(), e);
+                }
+            }
+            while (context.isRunning()) {
+                Thread.sleep(100);
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (Exception e) {
+                LOGGER.info("Exception while stopping binary log client", e);
+            }
+        }
+    }
+
+    private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connectorConfig,
+            MySqlConnection connection) {
+        String acceptedTlsVersion = connection.getSessionVariableForSslVersion();
+        if (!isNullOrEmpty(acceptedTlsVersion)) {
+            SSLMode sslMode = sslModeFor(connectorConfig.sslMode());
+
+            // Keystore settings can be passed via system properties too so we need to read them
+            final String password = System.getProperty("javax.net.ssl.keyStorePassword");
+            final String keyFilename = System.getProperty("javax.net.ssl.keyStore");
+            KeyManager[] keyManagers = null;
+            if (keyFilename != null) {
+                final char[] passwordArray = (password == null) ? null : password.toCharArray();
+                try {
+                    KeyStore ks = KeyStore.getInstance("JKS");
+                    ks.load(new FileInputStream(keyFilename), passwordArray);
+
+                    KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
+                    kmf.init(ks, passwordArray);
+
+                    keyManagers = kmf.getKeyManagers();
+                } catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException
+                        | UnrecoverableKeyException e) {
+                    throw new DebeziumException("Could not load keystore", e);
+                }
+            }
+
+            // DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that
+            // the accepted TLS version is passed to the constructed factory
+            if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) {
+                final KeyManager[] finalKMS = keyManagers;
+                return new DefaultSSLSocketFactory(acceptedTlsVersion) {
+
+                    @Override
+                    protected void initSSLContext(SSLContext sc)
+                            throws GeneralSecurityException {
+                        sc.init(finalKMS, new TrustManager[]{
+                                new X509TrustManager() {
+
+                                    @Override
+                                    public void checkClientTrusted(
+                                            X509Certificate[] x509Certificates,
+                                            String s)
+                                            throws CertificateException {
+                                    }
+
+                                    @Override
+                                    public void checkServerTrusted(
+                                            X509Certificate[] x509Certificates,
+                                            String s)
+                                            throws CertificateException {
+                                    }
+
+                                    @Override
+                                    public X509Certificate[] getAcceptedIssuers() {
+                                        return new X509Certificate[0];
+                                    }
+                                }
+                        }, null);

Review Comment:
   If the SSL mode is PREFERRED or REQUIRED, and we cannot obtain trustManagers (a secure implementation) from the KeyStore, we must create a custom TrustManager (an insecure implementation). This cannot be changed here, so we need to bypass the security check.
   Reference: https://issues.redhat.com/browse/DBZ-4787, https://codeql.github.com/codeql-query-help/java/java-insecure-trustmanager/



##########
inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##########
@@ -0,0 +1,1290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventData;
+import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import com.github.shyiko.mysql.binlog.network.AuthenticationException;
+import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+import com.github.shyiko.mysql.binlog.network.ServerException;
+import io.debezium.DebeziumException;
+import io.debezium.annotation.SingleThreadAccess;
+import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
+import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
+import io.debezium.data.Envelope.Operation;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
+import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import static io.debezium.util.Strings.isNullOrEmpty;
+
+/**
+ * Copied from Debezium project to fix
+ * https://github.com/apache/inlong/issues/8307.
+ *
+ * <p>Line 1134-1139 : Adjust GTID merging logic to support recovering from job which previously
+ * specifying starting offset on start.
+ *
+ * <p>Line 989-1062 : The current version of Debezium is 1.5.4-final, and the logic of the
+ * `getBinlogSslSocketFactory()` method has been upgraded to Debezium 1.9.1 to address security check issues.
+ */
+public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class);
+
+    private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
+
+    private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);
+    private final BinaryLogClient client;
+    private final MySqlStreamingChangeEventSourceMetrics metrics;
+    private final Clock clock;
+    private final EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode;
+    private final EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode;
+
+    private int startingRowNumber = 0;
+    private long initialEventsToSkip = 0L;
+    private boolean skipEvent = false;
+    private boolean ignoreDmlEventByGtidSource = false;
+    private final Predicate<String> gtidDmlSourceFilter;
+    private final AtomicLong totalRecordCounter = new AtomicLong();
+    private volatile Map<String, ?> lastOffset = null;
+    private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
+    private final float heartbeatIntervalFactor = 0.8f;
+    private final Map<String, Thread> binaryLogClientThreads = new ConcurrentHashMap<>(4);
+    private final MySqlTaskContext taskContext;
+    private final MySqlConnectorConfig connectorConfig;
+    private final MySqlConnection connection;
+    private final EventDispatcher<TableId> eventDispatcher;
+    private final MySqlOffsetContext offsetContext;
+    private final ErrorHandler errorHandler;
+    private boolean isRestoredFromCheckpoint = false;
+
+    @SingleThreadAccess("binlog client thread")
+    private Instant eventTimestamp;
+
+    public static class BinlogPosition {
+
+        final String filename;
+        final long position;
+
+        public BinlogPosition(String filename, long position) {
+            assert filename != null;
+
+            this.filename = filename;
+            this.position = position;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public long getPosition() {
+            return position;
+        }
+
+        @Override
+        public String toString() {
+            return filename + "/" + position;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + filename.hashCode();
+            result = prime * result + (int) (position ^ (position >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            BinlogPosition other = (BinlogPosition) obj;
+            if (!filename.equals(other.filename)) {
+                return false;
+            }
+            if (position != other.position) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    @FunctionalInterface
+    private static interface BinlogChangeEmitter<T> {
+
+        void emit(TableId tableId, T data) throws InterruptedException;
+    }
+
+    public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlOffsetContext offsetContext,
+            MySqlConnection connection,
+            EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
+            MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics) {
+
+        this.taskContext = taskContext;
+        this.connectorConfig = connectorConfig;
+        this.connection = connection;
+        this.clock = clock;
+        this.eventDispatcher = dispatcher;
+        this.errorHandler = errorHandler;
+        // With snapshot mode NEVER the initial context is not created by snapshot
+        this.offsetContext = (offsetContext == null) ? MySqlOffsetContext.initial(connectorConfig) : offsetContext;
+        this.metrics = metrics;
+
+        eventDeserializationFailureHandlingMode = connectorConfig.getEventProcessingFailureHandlingMode();
+        inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();
+
+        // Set up the log reader ...
+        client = taskContext.getBinaryLogClient();
+        // BinaryLogClient will overwrite thread names later
+        client.setThreadFactory(
+                Threads.threadFactory(MySqlConnector.class, connectorConfig.getLogicalName(), "binlog-client", false,
+                        false,
+                        x -> binaryLogClientThreads.put(x.getName(), x)));
+        client.setServerId(connectorConfig.serverId());
+        client.setSSLMode(sslModeFor(connectorConfig.sslMode()));
+        if (connectorConfig.sslModeEnabled()) {
+            SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectorConfig, connection);
+            if (sslSocketFactory != null) {
+                client.setSslSocketFactory(sslSocketFactory);
+            }
+        }
+        Configuration configuration = connectorConfig.getConfig();
+        client.setKeepAlive(configuration.getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
+        final long keepAliveInterval = configuration.getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
+        client.setKeepAliveInterval(keepAliveInterval);
+        // Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor
+        // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default value of
+        // heartbeatIntervalFactor
+        // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from
+        // the MySQL server.
+        client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));
+
+        boolean filterDmlEventsByGtidSource =
+                configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
+        gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
+
+        // Set up the event deserializer with additional type(s) ...
+        final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
+        EventDeserializer eventDeserializer = new EventDeserializer() {
+
+            @Override
+            public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
+                try {
+                    // Delegate to the superclass ...
+                    Event event = super.nextEvent(inputStream);
+
+                    // We have to record the most recent TableMapEventData for each table number for our custom
+                    // deserializers ...
+                    if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
+                        TableMapEventData tableMapEvent = event.getData();
+                        tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
+                    }
+                    return event;
+                }
+                // DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
+                catch (EventDataDeserializationException edde) {
+                    // DBZ-3095 As of Java 15, when reaching EOF in the binlog stream, the polling loop in
+                    // BinaryLogClient#listenForEventPackets() keeps returning values != -1 from peek();
+                    // this causes the loop to never finish
+                    // Propagating the exception (either EOF or socket closed) causes the loop to be aborted
+                    // in this case
+                    if (edde.getCause() instanceof IOException) {
+                        throw edde;
+                    }
+
+                    EventHeaderV4 header = new EventHeaderV4();
+                    header.setEventType(EventType.INCIDENT);
+                    header.setTimestamp(edde.getEventHeader().getTimestamp());
+                    header.setServerId(edde.getEventHeader().getServerId());
+
+                    if (edde.getEventHeader() instanceof EventHeaderV4) {
+                        header.setEventLength(((EventHeaderV4) edde.getEventHeader()).getEventLength());
+                        header.setNextPosition(((EventHeaderV4) edde.getEventHeader()).getNextPosition());
+                        header.setFlags(((EventHeaderV4) edde.getEventHeader()).getFlags());
+                    }
+
+                    EventData data = new EventDataDeserializationExceptionData(edde);
+                    return new Event(header, data);
+                }
+            }
+        };
+
+        // Add our custom deserializers ...
+        eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
+        eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
+        eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
+                new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
+                new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
+                new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
+                new RowDeserializers.WriteRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
+                new RowDeserializers.UpdateRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
+                new RowDeserializers.DeleteRowsDeserializer(
+                        tableMapEventByTableId).setMayContainExtraInformation(true));
+        client.setEventDeserializer(eventDeserializer);
+    }
+
+    protected void onEvent(Event event) {
+        long ts = 0;
+
+        if (event.getHeader().getEventType() == EventType.HEARTBEAT) {
+            // HEARTBEAT events have no timestamp but are fired only when
+            // there is no traffic on the connection which means we are caught-up
+            // https://dev.mysql.com/doc/internals/en/heartbeat-event.html
+            metrics.setMilliSecondsBehindSource(ts);
+            return;
+        }
+
+        // MySQL has seconds resolution but mysql-binlog-connector-java returns
+        // a value in milliseconds
+        long eventTs = event.getHeader().getTimestamp();
+
+        if (eventTs == 0) {
+            LOGGER.trace("Received unexpected event with 0 timestamp: {}", event);
+            return;
+        }
+
+        ts = clock.currentTimeInMillis() - eventTs;
+        LOGGER.trace("Current milliseconds behind source: {} ms", ts);
+        metrics.setMilliSecondsBehindSource(ts);
+    }
+
+    protected void ignoreEvent(Event event) {
+        LOGGER.trace("Ignoring event due to missing handler: {}", event);
+    }
+
+    protected void handleEvent(Event event) {
+        if (event == null) {
+            return;
+        }
+
+        final EventHeader eventHeader = event.getHeader();
+        // Update the source offset info. Note that the client returns the value in *milliseconds*, even though the
+        // binlog
+        // contains only *seconds* precision ...
+        // HEARTBEAT events have no timestamp; only set the timestamp if the event is not a HEARTBEAT
+        eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT)
+                ? Instant.ofEpochMilli(eventHeader.getTimestamp())
+                : null;
+        offsetContext.setBinlogThread(eventHeader.getServerId());
+
+        final EventType eventType = eventHeader.getEventType();
+        if (eventType == EventType.ROTATE) {
+            EventData eventData = event.getData();
+            RotateEventData rotateEventData;
+            if (eventData instanceof EventDeserializer.EventDataWrapper) {
+                rotateEventData = (RotateEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+            } else {
+                rotateEventData = (RotateEventData) eventData;
+            }
+            offsetContext.setBinlogStartPoint(rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition());
+        } else if (eventHeader instanceof EventHeaderV4) {
+            EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
+            offsetContext.setEventPosition(trackableEventHeader.getPosition(), trackableEventHeader.getEventLength());
+        }
+
+        // If there is a handler for this event, forward the event to it ...
+        try {
+            // Forward the event to the handler ...
+            eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event);
+
+            // Generate heartbeat message if the time is right
+            eventDispatcher.dispatchHeartbeatEvent(offsetContext);
+
+            // Capture that we've completed another event ...
+            offsetContext.completeEvent();
+
+            if (skipEvent) {
+                // We're in the mode of skipping events and we just skipped this one, so decrement our skip count ...
+                --initialEventsToSkip;
+                skipEvent = initialEventsToSkip > 0;
+            }
+        } catch (RuntimeException e) {
+            // There was an error in the event handler, so propagate the failure to Kafka Connect ...
+            logStreamingSourceState();
+            errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog event", e));
+            // Do not stop the client, since Kafka Connect should stop the connector on it's own
+            // (and doing it here may cause problems the second time it is stopped).
+            // We can clear the listeners though so that we ignore all future events ...
+            eventHandlers.clear();
+            LOGGER.info(
+                    "Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.");
+        } catch (InterruptedException e) {
+            // Most likely because this reader was stopped and our thread was interrupted ...
+            Thread.currentThread().interrupt();
+            eventHandlers.clear();
+            LOGGER.info("Stopped processing binlog events due to thread interruption");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends EventData> T unwrapData(Event event) {
+        EventData eventData = event.getData();
+        if (eventData instanceof EventDeserializer.EventDataWrapper) {
+            eventData = ((EventDeserializer.EventDataWrapper) eventData).getInternal();
+        }
+        return (T) eventData;
+    }
+
+    /**
+     * Handle the supplied event that signals that mysqld has stopped.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerStop(Event event) {
+        LOGGER.debug("Server stopped: {}", event);
+    }
+
+    /**
+     * Handle the supplied event that is sent by a primary to a replica to let the replica know that the primary is still alive. Not
+     * written to a binary log.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerHeartbeat(Event event) {
+        LOGGER.trace("Server heartbeat: {}", event);
+    }
+
+    /**
+     * Handle the supplied event that signals that an out of the ordinary event that occurred on the master. It notifies the replica
+     * that something happened on the primary that might cause data to be in an inconsistent state.
+     *
+     * @param event the server stopped event to be processed; may not be null
+     */
+    protected void handleServerIncident(Event event) {
+        if (event.getData() instanceof EventDataDeserializationExceptionData) {
+            metrics.onErroneousEvent("source = " + event.toString());
+            EventDataDeserializationExceptionData data = event.getData();
+
+            EventHeaderV4 eventHeader = (EventHeaderV4) data.getCause().getEventHeader(); // safe cast, instantiated
+                                                                                          // that ourselves
+
+            // logging some additional context but not the exception itself, this will happen in handleEvent()
+            if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                LOGGER.error(
+                        "Error while deserializing binlog event at offset {}.{}" +
+                                "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        offsetContext.getOffset(),
+                        System.lineSeparator(),
+                        eventHeader.getPosition(),
+                        eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+
+                throw new RuntimeException(data.getCause());
+            } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn(
+                        "Error while deserializing binlog event at offset {}.{}" +
+                                "This exception will be ignored and the event be skipped.{}" +
+                                "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        offsetContext.getOffset(),
+                        System.lineSeparator(),
+                        System.lineSeparator(),
+                        eventHeader.getPosition(),
+                        eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename(),
+                        data.getCause());
+            }
+        } else {
+            LOGGER.error("Server incident: {}", event);
+        }
+    }
+
+    /**
+     * Handle the supplied event with a {@link RotateEventData} that signals the logs are being rotated. This means that either
+     * the server was restarted, or the binlog has transitioned to a new file. In either case, subsequent table numbers will be
+     * different than those seen to this point.
+     *
+     * @param event the database change data event to be processed; may not be null
+     */
+    protected void handleRotateLogsEvent(Event event) {
+        LOGGER.debug("Rotating logs: {}", event);
+        RotateEventData command = unwrapData(event);
+        assert command != null;
+        taskContext.getSchema().clearTableMappings();
+    }
+
+    /**
+     * Handle the supplied event with a {@link GtidEventData} that signals the beginning of a GTID transaction.
+     * We don't yet know whether this transaction contains any events we're interested in, but we have to record
+     * it so that we know the position of this event and know we've processed the binlog to this point.
+     * <p>
+     * Note that this captures the current GTID and complete GTID set, regardless of whether the connector is
+     * {@link MySqlTaskContext#gtidSourceFilter() filtering} the GTID set upon connection. We do this because
+     * we actually want to capture all GTID set values found in the binlog, whether or not we process them.
+     * However, only when we connect do we actually want to pass to MySQL only those GTID ranges that are applicable
+     * per the configuration.
+     *
+     * @param event the GTID event to be processed; may not be null
+     */
+    protected void handleGtidEvent(Event event) {
+        LOGGER.debug("GTID transaction: {}", event);
+        GtidEventData gtidEvent = unwrapData(event);
+        String gtid = gtidEvent.getGtid();
+        gtidSet.add(gtid);
+        offsetContext.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set
+        ignoreDmlEventByGtidSource = false;
+        if (gtidDmlSourceFilter != null && gtid != null) {
+            String uuid = gtid.trim().substring(0, gtid.indexOf(":"));
+            if (!gtidDmlSourceFilter.test(uuid)) {
+                ignoreDmlEventByGtidSource = true;
+            }
+        }
+        metrics.onGtidChange(gtid);
+    }
+
+    /**
+     * Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL query
+     * that generated the event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     */
+    protected void handleRowsQuery(Event event) {
+        // Unwrap the RowsQueryEvent
+        final RowsQueryEventData lastRowsQueryEventData = unwrapData(event);
+
+        // Set the query on the source
+        offsetContext.setQuery(lastRowsQueryEventData.getQuery());
+    }
+
+    /**
+     * Handle the supplied event with an {@link QueryEventData} by possibly recording the DDL statements as changes in the
+     * MySQL schemas.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while recording the DDL statements
+     */
+    protected void handleQueryEvent(Event event) throws InterruptedException {
+        QueryEventData command = unwrapData(event);
+        LOGGER.debug("Received query command: {}", event);
+        String sql = command.getSql().trim();
+        if (sql.equalsIgnoreCase("BEGIN")) {
+            // We are starting a new transaction ...
+            offsetContext.startNextTransaction();
+            eventDispatcher.dispatchTransactionStartedEvent(offsetContext.getTransactionId(), offsetContext);
+            offsetContext.setBinlogThread(command.getThreadId());
+            if (initialEventsToSkip != 0) {
+                LOGGER.debug(
+                        "Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
+                        initialEventsToSkip, startingRowNumber);
+                // We are restarting, so we need to skip the events in this transaction that we processed previously...
+                skipEvent = true;
+            }
+            return;
+        }
+        if (sql.equalsIgnoreCase("COMMIT")) {
+            handleTransactionCompletion(event);
+            return;
+        }
+
+        String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
+
+        if (upperCasedStatementBegin.startsWith("XA ")) {
+            // This is an XA transaction, and we currently ignore these and do nothing ...
+            return;
+        }
+        if (connectorConfig.getDdlFilter().test(sql)) {
+            LOGGER.debug("DDL '{}' was filtered out of processing", sql);
+            return;
+        }
+        if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ")
+                || upperCasedStatementBegin.equals("DELETE ")) {
+            if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                throw new DebeziumException(
+                        "Received DML '" + sql
+                                + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+            } else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn("Warning only: Received DML '" + sql
+                        + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+                return;
+            } else {
+                return;
+            }
+        }
+        if (sql.equalsIgnoreCase("ROLLBACK")) {
+            // We have hit a ROLLBACK which is not supported
+            LOGGER.warn(
+                    "Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering",
+                    MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
+        }
+
+        final List<SchemaChangeEvent> schemaChangeEvents =
+                taskContext.getSchema().parseStreamingDdl(sql, command.getDatabase(), offsetContext,
+                        clock.currentTimeAsInstant());
+        try {
+            for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
+                if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
+                    continue;
+                }
+
+                final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null
+                        : schemaChangeEvent.getTables().iterator().next().id();
+                eventDispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> {
+                    try {
+                        receiver.schemaChangeEvent(schemaChangeEvent);
+                    } catch (Exception e) {
+                        throw new DebeziumException(e);
+                    }
+                });
+            }
+        } catch (InterruptedException e) {
+            LOGGER.info("Processing interrupted");
+        }
+    }
+
+    private void handleTransactionCompletion(Event event) throws InterruptedException {
+        // We are completing the transaction ...
+        eventDispatcher.dispatchTransactionCommittedEvent(offsetContext);
+        offsetContext.commitTransaction();
+        offsetContext.setBinlogThread(-1L);
+        skipEvent = false;
+        ignoreDmlEventByGtidSource = false;
+    }
+
+    /**
+     * Handle a change in the table metadata.
+     * <p>
+     * This method should be called whenever we consume a TABLE_MAP event, and every transaction in the log should include one
+     * of these for each table affected by the transaction. Each table map event includes a monotonically-increasing numeric
+     * identifier, and this identifier is used within subsequent events within the same transaction. This table identifier can
+     * change when:
+     * <ol>
+     * <li>the table structure is modified (e.g., via an {@code ALTER TABLE ...} command); or</li>
+     * <li>MySQL rotates to a new binary log file, even if the table structure does not change.</li>
+     * </ol>
+     *
+     * @param event the update event; never null
+     */
+    protected void handleUpdateTableMetadata(Event event) {
+        TableMapEventData metadata = unwrapData(event);
+        long tableNumber = metadata.getTableId();
+        String databaseName = metadata.getDatabase();
+        String tableName = metadata.getTable();
+        TableId tableId = new TableId(databaseName, null, tableName);
+        if (taskContext.getSchema().assignTableNumber(tableNumber, tableId)) {
+            LOGGER.debug("Received update table metadata event: {}", event);
+        } else {
+            informAboutUnknownTableIfRequired(event, tableId, "update table metadata");
+        }
+    }
+
+    /**
+     * If we receive an event for a table that is monitored but whose metadata we
+     * don't know, either ignore that event or raise a warning or error as per the
+     * {@link MySqlConnectorConfig#INCONSISTENT_SCHEMA_HANDLING_MODE} configuration.
+     */
+    private void informAboutUnknownTableIfRequired(Event event, TableId tableId, String typeToLog) {
+        if (tableId != null && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
+            metrics.onErroneousEvent("source = " + tableId + ", event " + event);
+            EventHeaderV4 eventHeader = event.getHeader();
+
+            if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
+                LOGGER.error(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), eventHeader.getPosition(),
+                        eventHeader.getNextPosition(), offsetContext.getSource().binlogFilename());
+                throw new DebeziumException("Encountered change event for table " + tableId
+                        + " whose schema isn't known to this connector");
+            } else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
+                LOGGER.warn(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "The event will be ignored.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+                        eventHeader.getPosition(), eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+            } else {
+                LOGGER.debug(
+                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+                                + "The event will be ignored.{}"
+                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
+                        event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
+                        eventHeader.getPosition(), eventHeader.getNextPosition(),
+                        offsetContext.getSource().binlogFilename());
+            }
+        } else {
+            LOGGER.debug("Filtering {} event: {} for non-monitored table {}", typeToLog, event, tableId);
+            metrics.onFilteredEvent("source = " + tableId);
+        }
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link WriteRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleInsert(Event event) throws InterruptedException {
+        handleChange(event, "insert", WriteRowsEventData.class, x -> taskContext.getSchema().getTableId(x.getTableId()),
+                WriteRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.CREATE, null, row)));
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link UpdateRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleUpdate(Event event) throws InterruptedException {
+        handleChange(event, "update", UpdateRowsEventData.class,
+                x -> taskContext.getSchema().getTableId(x.getTableId()), UpdateRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.UPDATE, row.getKey(),
+                                row.getValue())));
+    }
+
+    /**
+     * Generate source records for the supplied event with an {@link DeleteRowsEventData}.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void handleDelete(Event event) throws InterruptedException {
+        handleChange(event, "delete", DeleteRowsEventData.class,
+                x -> taskContext.getSchema().getTableId(x.getTableId()), DeleteRowsEventData::getRows,
+                (tableId, row) -> eventDispatcher.dispatchDataChangeEvent(tableId,
+                        new MySqlChangeRecordEmitter(offsetContext, clock, Operation.DELETE, row, null)));
+    }
+
+    private <T extends EventData, U> void handleChange(Event event, String changeType, Class<T> eventDataClass,
+            TableIdProvider<T> tableIdProvider,
+            RowsProvider<T, U> rowsProvider, BinlogChangeEmitter<U> changeEmitter)
+            throws InterruptedException {
+        if (skipEvent) {
+            // We can skip this because we should already be at least this far ...
+            LOGGER.info("Skipping previously processed row event: {}", event);
+            return;
+        }
+        if (ignoreDmlEventByGtidSource) {
+            LOGGER.debug("Skipping DML event because this GTID source is filtered: {}", event);
+            return;
+        }
+        final T data = unwrapData(event);
+        final TableId tableId = tableIdProvider.getTableId(data);
+        final List<U> rows = rowsProvider.getRows(data);
+
+        if (tableId != null && taskContext.getSchema().schemaFor(tableId) != null) {
+            int count = 0;
+            int numRows = rows.size();
+            if (startingRowNumber < numRows) {
+                for (int row = startingRowNumber; row != numRows; ++row) {
+                    offsetContext.setRowNumber(row, numRows);
+                    offsetContext.event(tableId, eventTimestamp);
+                    changeEmitter.emit(tableId, rows.get(row));
+                    count++;
+                }
+                if (LOGGER.isDebugEnabled()) {
+                    if (startingRowNumber != 0) {
+                        LOGGER.debug("Emitted {} {} record(s) for last {} row(s) in event: {}",
+                                count, changeType, numRows - startingRowNumber, event);
+                    } else {
+                        LOGGER.debug("Emitted {} {} record(s) for event: {}", count, changeType, event);
+                    }
+                }
+                offsetContext.changeEventCompleted();
+            } else {
+                // All rows were previously processed ...
+                LOGGER.debug("Skipping previously processed {} event: {}", changeType, event);
+            }
+        } else {
+            informAboutUnknownTableIfRequired(event, tableId, changeType + " row");
+        }
+        startingRowNumber = 0;
+    }
+
+    /**
+     * Handle a {@link EventType#VIEW_CHANGE} event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void viewChange(Event event) throws InterruptedException {
+        LOGGER.debug("View Change event: {}", event);
+        // do nothing
+    }
+
+    /**
+     * Handle a {@link EventType#XA_PREPARE} event.
+     *
+     * @param event the database change data event to be processed; may not be null
+     * @throws InterruptedException if this thread is interrupted while blocking
+     */
+    protected void prepareTransaction(Event event) throws InterruptedException {
+        LOGGER.debug("XA Prepare event: {}", event);
+        // do nothing
+    }
+
+    private SSLMode sslModeFor(SecureConnectionMode mode) {
+        switch (mode) {
+            case DISABLED:
+                return SSLMode.DISABLED;
+            case PREFERRED:
+                return SSLMode.PREFERRED;
+            case REQUIRED:
+                return SSLMode.REQUIRED;
+            case VERIFY_CA:
+                return SSLMode.VERIFY_CA;
+            case VERIFY_IDENTITY:
+                return SSLMode.VERIFY_IDENTITY;
+        }
+        return null;
+    }
+
+    @Override
+    public void execute(ChangeEventSourceContext context) throws InterruptedException {
+        if (!connectorConfig.getSnapshotMode().shouldStream()) {
+            LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode());
+            return;
+        }
+        taskContext.getSchema().assureNonEmptySchema();
+        final Set<Operation> skippedOperations = connectorConfig.getSkippedOps();
+
+        // Register our event handlers ...
+        eventHandlers.put(EventType.STOP, this::handleServerStop);
+        eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
+        eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
+        eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
+        eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
+        eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
+
+        if (!skippedOperations.contains(Operation.CREATE)) {
+            eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
+            eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
+        }
+
+        if (!skippedOperations.contains(Operation.UPDATE)) {
+            eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
+            eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
+        }
+
+        if (!skippedOperations.contains(Operation.DELETE)) {
+            eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
+            eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
+        }
+
+        eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
+        eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
+        eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
+
+        // Conditionally register ROWS_QUERY handler to parse SQL statements.
+        if (connectorConfig.includeSqlQuery()) {
+            eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
+        }
+
+        client.registerEventListener(connectorConfig.bufferSizeForStreamingChangeEventSource() == 0
+                ? this::handleEvent
+                : (new EventBuffer(connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context))::add);
+
+        client.registerLifecycleListener(new ReaderThreadLifecycleListener());
+        client.registerEventListener(this::onEvent);
+        if (LOGGER.isDebugEnabled()) {
+            client.registerEventListener(this::logEvent);
+        }
+
+        final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
+        metrics.setIsGtidModeEnabled(isGtidModeEnabled);
+
+        // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium
+        // checkpoint.
+        String availableServerGtidStr = connection.knownGtidSet();
+        if (isGtidModeEnabled) {
+            // The server is using GTIDs, so enable the handler ...
+            eventHandlers.put(EventType.GTID, this::handleGtidEvent);
+
+            // Now look at the GTID set from the server and what we've previously seen ...
+            GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);
+
+            // also take into account purged GTID logs
+            GtidSet purgedServerGtidSet = connection.purgedGtidSet();
+            LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);
+
+            GtidSet filteredGtidSet = filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
+            if (filteredGtidSet != null) {
+                // We've seen at least some GTIDs, so start reading from the filtered GTID set ...
+                LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
+                String filteredGtidSetStr = filteredGtidSet.toString();
+                client.setGtidSet(filteredGtidSetStr);
+                offsetContext.setCompletedGtidSet(filteredGtidSetStr);
+                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
+            } else {
+                // We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning
+                // ...
+                client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+                client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
+            }
+        } else {
+            // The server is not using GTIDs, so start reading the binlog based upon where we last left off ...
+            client.setBinlogFilename(offsetContext.getSource().binlogFilename());
+            client.setBinlogPosition(offsetContext.getSource().binlogPosition());
+        }
+
+        // We may be restarting in the middle of a transaction, so see how far into the transaction we have already
+        // processed...
+        initialEventsToSkip = offsetContext.eventsToSkipUponRestart();
+        LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);
+
+        // Set the starting row number, which is the next row number to be read ...
+        startingRowNumber = offsetContext.rowsToSkipUponRestart();
+        LOGGER.info("Skip {} rows on streaming start", startingRowNumber);
+
+        // Only when we reach the first BEGIN event will we start to skip events ...
+        skipEvent = false;
+
+        try {
+            // Start the log reader, which starts background threads ...
+            if (context.isRunning()) {
+                long timeout = connectorConfig.getConnectionTimeout().toMillis();
+                long started = clock.currentTimeInMillis();
+                try {
+                    LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
+                    client.connect(timeout);
+                    // Need to wait for keepalive thread to be running, otherwise it can be left orphaned
+                    // The problem is with timing. When the close is called too early after connect then
+                    // the keepalive thread is not terminated
+                    if (client.isKeepAlive()) {
+                        LOGGER.info("Waiting for keepalive thread to start");
+                        final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
+                        int waitAttempts = 50;
+                        boolean keepAliveThreadRunning = false;
+                        while (!keepAliveThreadRunning && waitAttempts-- > 0) {
+                            for (Thread t : binaryLogClientThreads.values()) {
+                                if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
+                                    LOGGER.info("Keepalive thread is running");
+                                    keepAliveThreadRunning = true;
+                                }
+                            }
+                            metronome.pause();
+                        }
+                    }
+                } catch (TimeoutException e) {
+                    // If the client thread is interrupted *before* the client could connect, the client throws a
+                    // timeout exception
+                    // The only way we can distinguish this is if we get the timeout exception before the specified
+                    // timeout has
+                    // elapsed, so we simply check this (within 10%) ...
+                    long duration = clock.currentTimeInMillis() - started;
+                    if (duration > (0.9 * timeout)) {
+                        double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
+                        throw new DebeziumException(
+                                "Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
+                                        connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                                        + connectorConfig.username() + "'",
+                                e);
+                    }
+                    // Otherwise, we were told to shutdown, so we don't care about the timeout exception
+                } catch (AuthenticationException e) {
+                    throw new DebeziumException("Failed to authenticate to the MySQL database at " +
+                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                            + connectorConfig.username() + "'", e);
+                } catch (Throwable e) {
+                    throw new DebeziumException("Unable to connect to the MySQL database at " +
+                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
+                            + connectorConfig.username() + "': " + e.getMessage(), e);
+                }
+            }
+            while (context.isRunning()) {
+                Thread.sleep(100);
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (Exception e) {
+                LOGGER.info("Exception while stopping binary log client", e);
+            }
+        }
+    }
+
+    private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connectorConfig,
+            MySqlConnection connection) {
+        String acceptedTlsVersion = connection.getSessionVariableForSslVersion();
+        if (!isNullOrEmpty(acceptedTlsVersion)) {
+            SSLMode sslMode = sslModeFor(connectorConfig.sslMode());
+
+            // Keystore settings can be passed via system properties too so we need to read them
+            final String password = System.getProperty("javax.net.ssl.keyStorePassword");
+            final String keyFilename = System.getProperty("javax.net.ssl.keyStore");
+            final String trustPassword = System.getProperty("javax.net.ssl.trustStorePassword");
+            final String trustFilename = System.getProperty("javax.net.ssl.trustStore");
+            KeyManager[] keyManagers = null;
+            if (keyFilename != null) {
+                final char[] passwordArray = (password == null) ? null : password.toCharArray();
+                try {
+                    KeyStore ks = loadKeyStore(keyFilename, passwordArray);
+
+                    KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
+                    kmf.init(ks, passwordArray);
+
+                    keyManagers = kmf.getKeyManagers();
+                } catch (KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException e) {
+                    throw new DebeziumException("Could not load keystore", e);
+                }
+            }
+
+            TrustManager[] trustManagers;
+            try {
+                KeyStore ks = null;
+                if (trustFilename != null) {
+                    final char[] trustPasswordArray = (trustPassword == null) ? null : trustPassword.toCharArray();
+                    ks = loadKeyStore(trustFilename, trustPasswordArray);
+                }
+
+                if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) {
+                    trustManagers = new TrustManager[]{
+                            new X509TrustManager() {
+
+                                @Override
+                                public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
+                                        throws CertificateException {
+                                }
+
+                                @Override
+                                public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
+                                        throws CertificateException {
+                                }
+
+                                @Override
+                                public X509Certificate[] getAcceptedIssuers() {
+                                    return new X509Certificate[0];
+                                }
+                            }
+                    };
+                } else {
+                    TrustManagerFactory tmf =
+                            TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+                    tmf.init(ks);
+                    trustManagers = tmf.getTrustManagers();
+                }
+            } catch (KeyStoreException | NoSuchAlgorithmException e) {
+                throw new DebeziumException("Could not load truststore", e);
+            }
+            // DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that
+            // the accepted TLS version is passed to the constructed factory
+            final KeyManager[] finalKMS = keyManagers;
+            return new DefaultSSLSocketFactory(acceptedTlsVersion) {
+
+                @Override
+                protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
+                    sc.init(finalKMS, trustManagers, null);

Review Comment:
   If the SSL mode is PREFERRED or REQUIRED, and we cannot obtain trustManagers (a secure implementation) from the KeyStore, we must create a custom TrustManager (an insecure implementation). This cannot be changed here, so we need to bypass the security check.
   Reference: https://issues.redhat.com/browse/DBZ-4787, https://codeql.github.com/codeql-query-help/java/java-insecure-trustmanager/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org