You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/09/29 17:05:17 UTC
[shardingsphere] branch master updated: Refactor PipelineJobHasAlreadyFinishedException (#21275)
This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d15ca41bef3 Refactor PipelineJobHasAlreadyFinishedException (#21275)
d15ca41bef3 is described below
commit d15ca41bef3909a7b8838d9c7c97497dacde6235
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Sep 30 01:05:07 2022 +0800
Refactor PipelineJobHasAlreadyFinishedException (#21275)
* Refactor PipelineJobHasAlreadyFinishedException
* Refactor PipelineJobHasAlreadyFinishedException
---
.../user-manual/error-code/sql-error-code.cn.md | 1 +
.../user-manual/error-code/sql-error-code.en.md | 1 +
.../PipelineJobHasAlreadyFinishedException.java | 6 ++---
.../ConsistencyCheckJobAPIImpl.java | 28 +++++++++-------------
.../general/PostgreSQLMigrationGeneralIT.java | 18 +++++++-------
5 files changed, 24 insertions(+), 30 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 4813cb33516..c164cde2108 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -115,6 +115,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| 08000 | 18092 | Get binlog position failed by job \`%s\`, reason is: %s |
| HY000 | 18093 | Can not poll event because of binlog sync channel already closed |
| HY000 | 18094 | Task \`%s\` execute failed |
+| HY000 | 18095 | Job has already finished, please run \`CHECK MIGRATION %s\` to start a new data consistency check job |
### DistSQL
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 170cf4b4b3e..4dea15e1dc9 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -115,6 +115,7 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
| 08000 | 18092 | Get binlog position failed by job \`%s\`, reason is: %s |
| HY000 | 18093 | Can not poll event because of binlog sync channel already closed |
| HY000 | 18094 | Task \`%s\` execute failed |
+| HY000 | 18095 | Job has already finished, please run \`CHECK MIGRATION %s\` to start a new data consistency check job |
### DistSQL
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index c0835346d6d..8bb7e5bdcd4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -25,9 +25,9 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpe
*/
public final class PipelineJobHasAlreadyFinishedException extends PipelineSQLException {
- private static final long serialVersionUID = 2854259384634892428L;
+ private static final long serialVersionUID = 6881217592831423520L;
- public PipelineJobHasAlreadyFinishedException(final String message) {
- super(XOpenSQLState.GENERAL_ERROR, 88, message);
+ public PipelineJobHasAlreadyFinishedException(final String jobId) {
+ super(XOpenSQLState.GENERAL_ERROR, 95, "Job has already finished, please run `CHECK MIGRATION %s` to start a new data consistency check job", jobId);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index c4963d50ab6..4ad73f6ee91 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -45,6 +45,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collections;
@@ -131,32 +132,25 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public void startDisabledJob(final String jobId) {
- log.info("start disable check job {}", jobId);
+ log.info("Start disable check job {}", jobId);
PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
- if (null != jobProgress && JobStatus.FINISHED == jobProgress.getStatus()) {
- throw new PipelineJobHasAlreadyFinishedException(String.format("job already finished, can use `CHECK MIGRATION '%s'` to start a new data consistency check job", jobId));
- }
- super.startDisabledJob(jobId);
+ ShardingSpherePreconditions.checkState(null == jobProgress || JobStatus.FINISHED != jobProgress.getStatus(), () -> new PipelineJobHasAlreadyFinishedException(jobId));
}
@Override
public void startByParentJobId(final String parentJobId) {
- log.info("start check job by parentJobId {}", parentJobId);
- Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
- if (!optional.isPresent()) {
- throw new PipelineJobNotFoundException(parentJobId + " check job");
- }
- startDisabledJob(optional.get());
+ log.info("Start check job by parent job id: {}", parentJobId);
+ Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+ startDisabledJob(checkLatestJobId.get());
}
@Override
public void stopByParentJobId(final String parentJobId) {
- log.info("stop check job by parentJobId {}", parentJobId);
- Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
- if (!optional.isPresent()) {
- throw new PipelineJobNotFoundException(parentJobId + " check job");
- }
- stop(optional.get());
+ log.info("Stop check job by parent job id: {}", parentJobId);
+ Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+ stop(checkLatestJobId.get());
}
@Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index d4c892d8db3..1da9b9c08f7 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -44,10 +44,10 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
/**
- * PostgreSQL general scaling test case. include openGauss type, same process.
+ * PostgreSQL and openGauss general scaling test case.
*/
-@Slf4j
@RunWith(Parameterized.class)
+@Slf4j
public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase {
private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new AutoIncrementKeyGenerateAlgorithm();
@@ -117,15 +117,13 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
/*
* TODO Compatible with restart job, before stopping job, incremental_idle_seconds=16, before checking migration, incremental_idle_seconds=23,
- * it just pass 7 seconds, and it's not enough for PostgreSQL incremental task to sync data
+ * it just pass 7 seconds, and it's not enough for PostgreSQL incremental task to sync data
*/
-/*
- stopMigrationByJobId(jobId);
- sourceExecuteWithLog(String.format("INSERT INTO %s.%s (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
- 1, "afterStop"));
- startMigrationByJobId(jobId);
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-*/
+// stopMigrationByJobId(jobId);
+// sourceExecuteWithLog(String.format("INSERT INTO %s.%s (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
+// 1, "afterStop"));
+// startMigrationByJobId(jobId);
+// waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}