You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/26 16:51:08 UTC

[GitHub] [incubator-seatunnel] ashulin opened a new pull request, #2904: [hotfix][connector][jdbc] fix JDBC split exception

ashulin opened a new pull request, #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   context#currentParallelism() is runtime method that will give incorrect results when used in JdbcSourceSplitEnumerator#open
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980697296


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java:
##########
@@ -28,47 +28,81 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+
+    private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+
     private JdbcSourceOptions jdbcSourceOptions;
     private final PartitionParameter partitionParameter;
-    private final int parallelism;
 
     public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
-        this.parallelism = enumeratorContext.currentParallelism();
+        this.pendingSplits = new HashMap<>();
     }
 
     @Override
     public void open() {
+        // No connection needs to be opened
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    private void discoverySplits() {
+        List<JdbcSourceSplit> allSplit = new ArrayList<>();
         LOG.info("Starting to calculate splits.");
         if (null != partitionParameter) {
             JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
-                    new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
+                new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
             Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
             for (int i = 0; i < parameterValues.length; i++) {
                 allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
             }
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
         }
+        int numReaders = enumeratorContext.currentParallelism();
+        for (JdbcSourceSplit split : allSplit) {
+            int ownerReader = split.splitId % numReaders;
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
         LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
-    @Override
-    public void run() throws Exception {
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<JdbcSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);

Review Comment:
   How to rollback `pendingSplits.remove()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
ashulin commented on PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#issuecomment-1258896510

   @hailin0 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980699425


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java:
##########
@@ -28,47 +28,81 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+
+    private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+
     private JdbcSourceOptions jdbcSourceOptions;
     private final PartitionParameter partitionParameter;
-    private final int parallelism;
 
     public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
-        this.parallelism = enumeratorContext.currentParallelism();
+        this.pendingSplits = new HashMap<>();
     }
 
     @Override
     public void open() {
+        // No connection needs to be opened
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    private void discoverySplits() {
+        List<JdbcSourceSplit> allSplit = new ArrayList<>();
         LOG.info("Starting to calculate splits.");
         if (null != partitionParameter) {
             JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
-                    new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
+                new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
             Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
             for (int i = 0; i < parameterValues.length; i++) {
                 allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
             }
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
         }
+        int numReaders = enumeratorContext.currentParallelism();
+        for (JdbcSourceSplit split : allSplit) {
+            int ownerReader = split.splitId % numReaders;
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
         LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
-    @Override
-    public void run() throws Exception {
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<JdbcSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);

Review Comment:
   It will not be rolled back. When an exception occurs, the split will be rediscovered and reassigned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980801070


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -58,20 +58,22 @@ public void close() throws IOException {
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        JdbcSourceSplit split = splits.poll();
-        if (null != split) {
-            inputFormat.open(split);
-            while (!inputFormat.reachedEnd()) {
-                SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
-                output.collect(seaTunnelRow);
+        synchronized (output.getCheckpointLock()) {

Review Comment:
   Is there any other better way than `synchronized`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980688377


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -60,12 +60,14 @@ public void close() throws IOException {
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         JdbcSourceSplit split = splits.poll();

Review Comment:
   move to synchronized code block ->  line#64



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java:
##########
@@ -28,47 +28,81 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+
+    private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+
     private JdbcSourceOptions jdbcSourceOptions;
     private final PartitionParameter partitionParameter;
-    private final int parallelism;
 
     public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
-        this.parallelism = enumeratorContext.currentParallelism();
+        this.pendingSplits = new HashMap<>();
     }
 
     @Override
     public void open() {
+        // No connection needs to be opened
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    private void discoverySplits() {
+        List<JdbcSourceSplit> allSplit = new ArrayList<>();
         LOG.info("Starting to calculate splits.");
         if (null != partitionParameter) {
             JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
-                    new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
+                new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
             Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
             for (int i = 0; i < parameterValues.length; i++) {
                 allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
             }
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
         }
+        int numReaders = enumeratorContext.currentParallelism();
+        for (JdbcSourceSplit split : allSplit) {
+            int ownerReader = split.splitId % numReaders;
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
         LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
-    @Override
-    public void run() throws Exception {
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<JdbcSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);

Review Comment:
   How to rollback when an exception is thrown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980701156


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -60,12 +60,14 @@ public void close() throws IOException {
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         JdbcSourceSplit split = splits.poll();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980911692


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -58,20 +58,22 @@ public void close() throws IOException {
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        JdbcSourceSplit split = splits.poll();
-        if (null != split) {
-            inputFormat.open(split);
-            while (!inputFormat.reachedEnd()) {
-                SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
-                output.collect(seaTunnelRow);
+        synchronized (output.getCheckpointLock()) {

Review Comment:
   > Is there any other better way than `synchronized`?
   We have discussed many times, and the current conclusion is to use this method



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -58,20 +58,22 @@ public void close() throws IOException {
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        JdbcSourceSplit split = splits.poll();
-        if (null != split) {
-            inputFormat.open(split);
-            while (!inputFormat.reachedEnd()) {
-                SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
-                output.collect(seaTunnelRow);
+        synchronized (output.getCheckpointLock()) {

Review Comment:
   > Is there any other better way than `synchronized`?
   
   We have discussed many times, and the current conclusion is to use this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980698434


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -60,12 +60,14 @@ public void close() throws IOException {
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         JdbcSourceSplit split = splits.poll();

Review Comment:
   `splits.poll()` should be after the lock is acquired?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980694248


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java:
##########
@@ -28,47 +28,81 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+
+    private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+
     private JdbcSourceOptions jdbcSourceOptions;
     private final PartitionParameter partitionParameter;
-    private final int parallelism;
 
     public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
-        this.parallelism = enumeratorContext.currentParallelism();
+        this.pendingSplits = new HashMap<>();
     }
 
     @Override
     public void open() {
+        // No connection needs to be opened
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    private void discoverySplits() {
+        List<JdbcSourceSplit> allSplit = new ArrayList<>();
         LOG.info("Starting to calculate splits.");
         if (null != partitionParameter) {
             JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
-                    new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
+                new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
             Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
             for (int i = 0; i < parameterValues.length; i++) {
                 allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
             }
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
         }
+        int numReaders = enumeratorContext.currentParallelism();
+        for (JdbcSourceSplit split : allSplit) {
+            int ownerReader = split.splitId % numReaders;
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
         LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
-    @Override
-    public void run() throws Exception {
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<JdbcSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);

Review Comment:
   The status of split is not currently saved and will be reassigned after a rollback, at least once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980947315


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -58,20 +58,22 @@ public void close() throws IOException {
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        JdbcSourceSplit split = splits.poll();
-        if (null != split) {
-            inputFormat.open(split);
-            while (!inputFormat.reachedEnd()) {
-                SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
-                output.collect(seaTunnelRow);
+        synchronized (output.getCheckpointLock()) {

Review Comment:
   ok, but it's not ideal. We can optimize it later. Most PRs are currently blocking on e2e.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org