You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/26 11:22:07 UTC

[incubator-seatunnel] branch dev updated: [Improve][connector-jdbc] Calculate splits only once in JdbcSourceSplitEnumerator (#2900)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7622f2899 [Improve][connector-jdbc] Calculate splits only once in JdbcSourceSplitEnumerator (#2900)
7622f2899 is described below

commit 7622f28999c18ff1090c5bec08ef96d7c6c7eb65
Author: Xiao Zhao <zh...@163.com>
AuthorDate: Mon Sep 26 19:21:59 2022 +0800

    [Improve][connector-jdbc] Calculate splits only once in JdbcSourceSplitEnumerator (#2900)
---
 .../jdbc/source/JdbcSourceSplitEnumerator.java     | 43 +++++++++++++---------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index f3b5368e9..71de5d176 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,20 +32,34 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
-
-    SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    List<JdbcSourceSplit> allSplit = new ArrayList<>();
-    JdbcSourceOptions jdbcSourceOptions;
-    PartitionParameter partitionParameter;
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
+    private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
+    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+    private JdbcSourceOptions jdbcSourceOptions;
+    private final PartitionParameter partitionParameter;
+    private final int parallelism;
 
     public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
+        this.parallelism = enumeratorContext.currentParallelism();
     }
 
     @Override
     public void open() {
+        LOG.info("Starting to calculate splits.");
+        if (null != partitionParameter) {
+            JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
+                    new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
+            Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
+            for (int i = 0; i < parameterValues.length; i++) {
+                allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+            }
+        } else {
+            allSplit.add(new JdbcSourceSplit(null, 0));
+        }
+        LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
     @Override
@@ -70,20 +87,10 @@ public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSour
 
     @Override
     public void registerReader(int subtaskId) {
-        int parallelism = enumeratorContext.currentParallelism();
-        if (allSplit.isEmpty()) {
-            if (null != partitionParameter) {
-                JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
-                Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
-                for (int i = 0; i < parameterValues.length; i++) {
-                    allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
-                }
-            } else {
-                allSplit.add(new JdbcSourceSplit(null, 0));
-            }
-        }
         // Filter the split that the current task needs to run
-        List<JdbcSourceSplit> splits = allSplit.stream().filter(p -> p.splitId % parallelism == subtaskId).collect(Collectors.toList());
+        List<JdbcSourceSplit> splits = allSplit.stream()
+                .filter(p -> p.splitId % parallelism == subtaskId)
+                .collect(Collectors.toList());
         enumeratorContext.assignSplit(subtaskId, splits);
         enumeratorContext.signalNoMoreSplits(subtaskId);
     }