You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/18 07:02:32 UTC

[inlong] branch master updated: [INLONG-7855][Sort] Fix Sort hang up reader in snapshot phase when reducing parallelism (#7856)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d33b494c0 [INLONG-7855][Sort] Fix Sort hang up reader in snapshot phase when reducing parallelism (#7856)
d33b494c0 is described below

commit d33b494c00432a301a30c517ba8315a37877d92f
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Apr 18 15:02:24 2023 +0800

    [INLONG-7855][Sort] Fix Sort hang up reader in snapshot phase when reducing parallelism (#7856)
---
 .../debezium/task/MySqlBinlogSplitReadTask.java    | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
index 44519c00d..967b9a60b 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.cdc.mysql.debezium.task;
 
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener;
 import com.github.shyiko.mysql.binlog.event.Event;
 import io.debezium.DebeziumException;
 import io.debezium.connector.mysql.MySqlConnection;
@@ -28,6 +30,7 @@ import io.debezium.connector.mysql.MySqlTaskContext;
 import io.debezium.pipeline.ErrorHandler;
 import io.debezium.relational.TableId;
 import io.debezium.util.Clock;
+import java.util.List;
 import org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.EventDispatcherImpl;
 import org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher;
 import org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl;
@@ -52,6 +55,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
     private final SignalEventDispatcher signalEventDispatcher;
     private final ErrorHandler errorHandler;
     private ChangeEventSourceContext context;
+    private final MySqlTaskContext taskContext;
 
     /**
      * Constructor of MySqlBinlogSplitReadTask.
@@ -80,14 +84,39 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
         this.eventDispatcher = dispatcher;
         this.offsetContext = offsetContext;
         this.errorHandler = errorHandler;
+        this.taskContext = taskContext;
+
         this.signalEventDispatcher =
                 new SignalEventDispatcher(
                         offsetContext.getPartition(), topic, eventDispatcher.getQueue());
     }
 
+    /**
+     * Clear reusedBinaryLogClient's eventListeners and lifecycleListeners of the last run to
+     * fix hung up of snapshot phase.
+     */
     @Override
     public void execute(ChangeEventSourceContext context) throws InterruptedException {
         this.context = context;
+
+        final BinaryLogClient client = taskContext.getBinaryLogClient();
+        final List<EventListener> eventListeners = client.getEventListeners();
+        final List<BinaryLogClient.LifecycleListener> lifecycleListeners =
+                client.getLifecycleListeners();
+
+        eventListeners.forEach(
+                listener -> {
+                    if (eventListeners.indexOf(listener) != eventListeners.size() - 1) {
+                        client.unregisterEventListener(listener);
+                    }
+                });
+        lifecycleListeners.forEach(
+                listener -> {
+                    if (lifecycleListeners.indexOf(listener) != lifecycleListeners.size() - 1) {
+                        client.unregisterLifecycleListener(listener);
+                    }
+                });
+
         super.execute(context);
     }