You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/18 07:02:32 UTC
[inlong] branch master updated: [INLONG-7855][Sort] Fix Sort hang up reader in snapshot phase when reducing parallelism (#7856)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d33b494c0 [INLONG-7855][Sort] Fix Sort hang up reader in snapshot phase when reducing parallelism (#7856)
d33b494c0 is described below
commit d33b494c00432a301a30c517ba8315a37877d92f
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Apr 18 15:02:24 2023 +0800
[INLONG-7855][Sort] Fix Sort hang up reader in snapshot phase when reducing parallelism (#7856)
---
.../debezium/task/MySqlBinlogSplitReadTask.java | 29 ++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
index 44519c00d..967b9a60b 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlBinlogSplitReadTask.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.cdc.mysql.debezium.task;
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener;
import com.github.shyiko.mysql.binlog.event.Event;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
@@ -28,6 +30,7 @@ import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
+import java.util.List;
import org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.EventDispatcherImpl;
import org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.inlong.sort.cdc.mysql.debezium.reader.SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl;
@@ -52,6 +55,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
private final SignalEventDispatcher signalEventDispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;
+ private final MySqlTaskContext taskContext;
/**
* Constructor of MySqlBinlogSplitReadTask.
@@ -80,14 +84,39 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
this.eventDispatcher = dispatcher;
this.offsetContext = offsetContext;
this.errorHandler = errorHandler;
+ this.taskContext = taskContext;
+
this.signalEventDispatcher =
new SignalEventDispatcher(
offsetContext.getPartition(), topic, eventDispatcher.getQueue());
}
+ /**
+ * Clear reusedBinaryLogClient's eventListeners and lifecycleListeners of the last run to
+ * fix hung up of snapshot phase.
+ */
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
this.context = context;
+
+ final BinaryLogClient client = taskContext.getBinaryLogClient();
+ final List<EventListener> eventListeners = client.getEventListeners();
+ final List<BinaryLogClient.LifecycleListener> lifecycleListeners =
+ client.getLifecycleListeners();
+
+ eventListeners.forEach(
+ listener -> {
+ if (eventListeners.indexOf(listener) != eventListeners.size() - 1) {
+ client.unregisterEventListener(listener);
+ }
+ });
+ lifecycleListeners.forEach(
+ listener -> {
+ if (lifecycleListeners.indexOf(listener) != lifecycleListeners.size() - 1) {
+ client.unregisterLifecycleListener(listener);
+ }
+ });
+
super.execute(context);
}