You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/14 02:06:35 UTC
[incubator-seatunnel] branch dev updated: [Bug][CDC] Fix concurrent modify of splits (#3937)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 29b04e240 [Bug][CDC] Fix concurrent modify of splits (#3937)
29b04e240 is described below
commit 29b04e24058acd76b32873e7fd18a42af8e3e44b
Author: hailin0 <wa...@apache.org>
AuthorDate: Sat Jan 14 10:06:29 2023 +0800
[Bug][CDC] Fix concurrent modify of splits (#3937)
---
.../cdc/base/source/enumerator/IncrementalSourceEnumerator.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index cee57bdd0..4aed2836b 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -67,13 +67,13 @@ public class IncrementalSourceEnumerator
}
@Override
- public void run() throws Exception {
+ public synchronized void run() throws Exception {
this.running = true;
assignSplits();
}
@Override
- public void handleSplitRequest(int subtaskId) {
+ public synchronized void handleSplitRequest(int subtaskId) {
if (!context.registeredReaders().contains(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
@@ -128,7 +128,7 @@ public class IncrementalSourceEnumerator
}
@Override
- public void notifyCheckpointComplete(long checkpointId) {
+ public synchronized void notifyCheckpointComplete(long checkpointId) {
splitAssigner.notifyCheckpointComplete(checkpointId);
// incremental split may be available after checkpoint complete
assignSplits();