You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/18 08:08:55 UTC
[shardingsphere] branch master updated: Improve pipeline importer job write exception (#25207)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 33ec1efb017 Improve pipeline importer job write exception (#25207)
33ec1efb017 is described below
commit 33ec1efb017f66bada6f97fc14e8ca4b9bc46ca8
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Apr 18 16:08:47 2023 +0800
Improve pipeline importer job write exception (#25207)
* Improve pipeline importer job write exception
* Add transaction isolation at inventory
* Improve unit test
---
.../api/config/ingest/InventoryDumperConfiguration.java | 2 ++
.../exception/job/PipelineImporterJobWriteException.java | 4 ++--
.../data/pipeline/core/importer/DataSourceImporter.java | 16 ++++++++++------
.../pipeline/core/ingest/dumper/InventoryDumper.java | 3 +++
.../consistency/MigrationDataConsistencyCheckerTest.java | 2 ++
5 files changed, 19 insertions(+), 8 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 3be5b12f5ea..8e8f88d5a49 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -44,6 +44,8 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
private String querySQL;
+ private Integer transactionIsolation;
+
private Integer shardingItem;
private int batchSize = 1000;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index 762d9ace2b4..b0f832722f7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -27,7 +27,7 @@ public final class PipelineImporterJobWriteException extends PipelineSQLExceptio
private static final long serialVersionUID = -7924663094479253130L;
- public PipelineImporterJobWriteException() {
- super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data failed.");
+ public PipelineImporterJobWriteException(final Exception cause) {
+ super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data failed.", cause);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 09f83d9eb24..a6a891d6e1b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -42,7 +42,6 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterCo
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -130,22 +129,27 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
if (null == buffer || buffer.isEmpty()) {
return;
}
- boolean success = tryFlush(dataSource, buffer);
- ShardingSpherePreconditions.checkState(!isRunning() || success, PipelineImporterJobWriteException::new);
+ try {
+ tryFlush(dataSource, buffer);
+ } catch (final SQLException ex) {
+ throw new PipelineImporterJobWriteException(ex);
+ }
}
@SneakyThrows(InterruptedException.class)
- private boolean tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
+ private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes(); i++) {
try {
doFlush(dataSource, buffer);
- return true;
+ return;
} catch (final SQLException ex) {
log.error("flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
+ if (i == importerConfig.getRetryTimes()) {
+ throw ex;
+ }
Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
}
}
- return false;
}
private void doFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 1de123faa15..548eac94a80 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -114,6 +114,9 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
int batchSize = dumperConfig.getBatchSize();
DatabaseType databaseType = dumperConfig.getDataSourceConfig().getDatabaseType();
+ if (null != dumperConfig.getTransactionIsolation()) {
+ connection.setTransactionIsolation(dumperConfig.getTransactionIsolation());
+ }
try (PreparedStatement preparedStatement = JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(databaseType, connection, buildInventoryDumpSQL())) {
dumpStatement = preparedStatement;
if (!(databaseType instanceof MySQLDatabaseType)) {
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 93d0986119f..e05dd7b08c7 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -57,6 +57,8 @@ class MigrationDataConsistencyCheckerTest {
MigrationJobConfiguration jobConfig = createJobConfiguration();
JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+ jobConfigurationPOJO.setJobName(jobConfig.getJobId());
+ jobConfigurationPOJO.setShardingTotalCount(1);
GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, "");