You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/18 08:08:55 UTC

[shardingsphere] branch master updated: Improve pipeline importer job write exception (#25207)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 33ec1efb017 Improve pipeline importer job write exception (#25207)
33ec1efb017 is described below

commit 33ec1efb017f66bada6f97fc14e8ca4b9bc46ca8
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Apr 18 16:08:47 2023 +0800

    Improve pipeline importer job write exception (#25207)
    
    * Improve pipeline importer job write exception
    
    * Add transaction isolation at inventory
    
    * Improve unit test
---
 .../api/config/ingest/InventoryDumperConfiguration.java  |  2 ++
 .../exception/job/PipelineImporterJobWriteException.java |  4 ++--
 .../data/pipeline/core/importer/DataSourceImporter.java  | 16 ++++++++++------
 .../pipeline/core/ingest/dumper/InventoryDumper.java     |  3 +++
 .../consistency/MigrationDataConsistencyCheckerTest.java |  2 ++
 5 files changed, 19 insertions(+), 8 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 3be5b12f5ea..8e8f88d5a49 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -44,6 +44,8 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
     
     private String querySQL;
     
+    private Integer transactionIsolation;
+    
     private Integer shardingItem;
     
     private int batchSize = 1000;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index 762d9ace2b4..b0f832722f7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -27,7 +27,7 @@ public final class PipelineImporterJobWriteException extends PipelineSQLExceptio
     
     private static final long serialVersionUID = -7924663094479253130L;
     
-    public PipelineImporterJobWriteException() {
-        super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data failed.");
+    public PipelineImporterJobWriteException(final Exception cause) {
+        super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data failed.", cause);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 09f83d9eb24..a6a891d6e1b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -42,7 +42,6 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterCo
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -130,22 +129,27 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         if (null == buffer || buffer.isEmpty()) {
             return;
         }
-        boolean success = tryFlush(dataSource, buffer);
-        ShardingSpherePreconditions.checkState(!isRunning() || success, PipelineImporterJobWriteException::new);
+        try {
+            tryFlush(dataSource, buffer);
+        } catch (final SQLException ex) {
+            throw new PipelineImporterJobWriteException(ex);
+        }
     }
     
     @SneakyThrows(InterruptedException.class)
-    private boolean tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
+    private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
         for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes(); i++) {
             try {
                 doFlush(dataSource, buffer);
-                return true;
+                return;
             } catch (final SQLException ex) {
                 log.error("flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
+                if (i == importerConfig.getRetryTimes()) {
+                    throw ex;
+                }
                 Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
             }
         }
-        return false;
     }
     
     private void doFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 1de123faa15..548eac94a80 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -114,6 +114,9 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
         int batchSize = dumperConfig.getBatchSize();
         DatabaseType databaseType = dumperConfig.getDataSourceConfig().getDatabaseType();
+        if (null != dumperConfig.getTransactionIsolation()) {
+            connection.setTransactionIsolation(dumperConfig.getTransactionIsolation());
+        }
         try (PreparedStatement preparedStatement = JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(databaseType, connection, buildInventoryDumpSQL())) {
             dumpStatement = preparedStatement;
             if (!(databaseType instanceof MySQLDatabaseType)) {
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 93d0986119f..e05dd7b08c7 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -57,6 +57,8 @@ class MigrationDataConsistencyCheckerTest {
         MigrationJobConfiguration jobConfig = createJobConfiguration();
         JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
         jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+        jobConfigurationPOJO.setJobName(jobConfig.getJobId());
+        jobConfigurationPOJO.setShardingTotalCount(1);
         GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
         governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, "");