You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/27 03:09:36 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2904: [hotfix][connector][jdbc] fix JDBC split exception

hailin0 commented on code in PR #2904:
URL: https://github.com/apache/incubator-seatunnel/pull/2904#discussion_r980688377


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java:
##########
@@ -60,12 +60,14 @@ public void close() throws IOException {
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         JdbcSourceSplit split = splits.poll();

Review Comment:
   move to synchronized code block ->  line#64



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java:
##########
@@ -28,47 +28,81 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.Set;
 
 public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+
+    private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
+
     private JdbcSourceOptions jdbcSourceOptions;
     private final PartitionParameter partitionParameter;
-    private final int parallelism;
 
     public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
-        this.parallelism = enumeratorContext.currentParallelism();
+        this.pendingSplits = new HashMap<>();
     }
 
     @Override
     public void open() {
+        // No connection needs to be opened
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    private void discoverySplits() {
+        List<JdbcSourceSplit> allSplit = new ArrayList<>();
         LOG.info("Starting to calculate splits.");
         if (null != partitionParameter) {
             JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
-                    new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
+                new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
             Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
             for (int i = 0; i < parameterValues.length; i++) {
                 allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
             }
         } else {
             allSplit.add(new JdbcSourceSplit(null, 0));
         }
+        int numReaders = enumeratorContext.currentParallelism();
+        for (JdbcSourceSplit split : allSplit) {
+            int ownerReader = split.splitId % numReaders;
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
         LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
-    @Override
-    public void run() throws Exception {
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<JdbcSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);

Review Comment:
   How to rollback when an exception is thrown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org