You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 09:24:10 UTC
[inlong] branch master updated: [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6545e4f9e [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)
6545e4f9e is described below
commit 6545e4f9e474a776b8178604c4048aa661793fff
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Jan 4 17:24:03 2023 +0800
[INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)
Co-authored-by: stingpeng <st...@tencent.com>
---
.../assigners/MySqlSnapshotSplitAssigner.java | 24 +++++++++++++---------
1 file changed, 14 insertions(+), 10 deletions(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 498edec76..d270fa792 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -205,17 +205,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
executor.submit(
() -> {
- Iterator<TableId> iterator = remainingTables.iterator();
- while (iterator.hasNext()) {
- TableId nextTable = iterator.next();
- // split the given table into chunks (snapshot splits)
- Collection<MySqlSnapshotSplit> splits =
- chunkSplitter.generateSplits(nextTable);
- synchronized (lock) {
- remainingSplits.addAll(splits);
- remainingTables.remove(nextTable);
- lock.notify();
+ try {
+ Iterator<TableId> iterator = remainingTables.iterator();
+ while (iterator.hasNext()) {
+ TableId nextTable = iterator.next();
+ // split the given table into chunks (snapshot splits)
+ Collection<MySqlSnapshotSplit> splits =
+ chunkSplitter.generateSplits(nextTable);
+ synchronized (lock) {
+ remainingSplits.addAll(splits);
+ remainingTables.remove(nextTable);
+ lock.notify();
+ }
}
+ } catch (Exception e) {
+ LOG.error("asynchronously split exit with exception", e);
}
});
}