You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 09:24:10 UTC

[inlong] branch master updated: [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6545e4f9e [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)
6545e4f9e is described below

commit 6545e4f9e474a776b8178604c4048aa661793fff
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Jan 4 17:24:03 2023 +0800

    [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)
    
    Co-authored-by: stingpeng <st...@tencent.com>
---
 .../assigners/MySqlSnapshotSplitAssigner.java      | 24 +++++++++++++---------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 498edec76..d270fa792 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -205,17 +205,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
 
             executor.submit(
                     () -> {
-                        Iterator<TableId> iterator = remainingTables.iterator();
-                        while (iterator.hasNext()) {
-                            TableId nextTable = iterator.next();
-                            // split the given table into chunks (snapshot splits)
-                            Collection<MySqlSnapshotSplit> splits =
-                                    chunkSplitter.generateSplits(nextTable);
-                            synchronized (lock) {
-                                remainingSplits.addAll(splits);
-                                remainingTables.remove(nextTable);
-                                lock.notify();
+                        try {
+                            Iterator<TableId> iterator = remainingTables.iterator();
+                            while (iterator.hasNext()) {
+                                TableId nextTable = iterator.next();
+                                // split the given table into chunks (snapshot splits)
+                                Collection<MySqlSnapshotSplit> splits =
+                                        chunkSplitter.generateSplits(nextTable);
+                                synchronized (lock) {
+                                    remainingSplits.addAll(splits);
+                                    remainingTables.remove(nextTable);
+                                    lock.notify();
+                                }
                             }
+                        } catch (Exception e) {
+                            LOG.error("asynchronously split exit with exception", e);
                         }
                     });
         }