You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/14 02:06:35 UTC

[incubator-seatunnel] branch dev updated: [Bug][CDC] Fix concurrent modify of splits (#3937)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 29b04e240 [Bug][CDC] Fix concurrent modify of splits (#3937)
29b04e240 is described below

commit 29b04e24058acd76b32873e7fd18a42af8e3e44b
Author: hailin0 <wa...@apache.org>
AuthorDate: Sat Jan 14 10:06:29 2023 +0800

    [Bug][CDC] Fix concurrent modify of splits (#3937)
---
 .../cdc/base/source/enumerator/IncrementalSourceEnumerator.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index cee57bdd0..4aed2836b 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -67,13 +67,13 @@ public class IncrementalSourceEnumerator
     }
 
     @Override
-    public void run() throws Exception {
+    public synchronized void run() throws Exception {
         this.running = true;
         assignSplits();
     }
 
     @Override
-    public void handleSplitRequest(int subtaskId) {
+    public synchronized void handleSplitRequest(int subtaskId) {
         if (!context.registeredReaders().contains(subtaskId)) {
             // reader failed between sending the request and now. skip this request.
             return;
@@ -128,7 +128,7 @@ public class IncrementalSourceEnumerator
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId) {
+    public synchronized void notifyCheckpointComplete(long checkpointId) {
         splitAssigner.notifyCheckpointComplete(checkpointId);
         // incremental split may be available after checkpoint complete
         assignSplits();