You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/09/29 17:05:17 UTC

[shardingsphere] branch master updated: Refactor PipelineJobHasAlreadyFinishedException (#21275)

This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d15ca41bef3 Refactor PipelineJobHasAlreadyFinishedException (#21275)
d15ca41bef3 is described below

commit d15ca41bef3909a7b8838d9c7c97497dacde6235
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Sep 30 01:05:07 2022 +0800

    Refactor PipelineJobHasAlreadyFinishedException (#21275)
    
    * Refactor PipelineJobHasAlreadyFinishedException
    
    * Refactor PipelineJobHasAlreadyFinishedException
---
 .../user-manual/error-code/sql-error-code.cn.md    |  1 +
 .../user-manual/error-code/sql-error-code.en.md    |  1 +
 .../PipelineJobHasAlreadyFinishedException.java    |  6 ++---
 .../ConsistencyCheckJobAPIImpl.java                | 28 +++++++++-------------
 .../general/PostgreSQLMigrationGeneralIT.java      | 18 +++++++-------
 5 files changed, 24 insertions(+), 30 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 4813cb33516..c164cde2108 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -115,6 +115,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | 08000     | 18092       | Get binlog position failed by job \`%s\`, reason is: %s |
 | HY000     | 18093       | Can not poll event because of binlog sync channel already closed |
 | HY000     | 18094       | Task \`%s\` execute failed |
+| HY000     | 18095       | Job has already finished, please run \`CHECK MIGRATION %s\` to start a new data consistency check job |
 
 ### DistSQL
 
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 170cf4b4b3e..4dea15e1dc9 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -115,6 +115,7 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
 | 08000     | 18092       | Get binlog position failed by job \`%s\`, reason is: %s |
 | HY000     | 18093       | Can not poll event because of binlog sync channel already closed |
 | HY000     | 18094       | Task \`%s\` execute failed |
+| HY000     | 18095       | Job has already finished, please run \`CHECK MIGRATION %s\` to start a new data consistency check job |
 
 ### DistSQL
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index c0835346d6d..8bb7e5bdcd4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -25,9 +25,9 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpe
  */
 public final class PipelineJobHasAlreadyFinishedException extends PipelineSQLException {
     
-    private static final long serialVersionUID = 2854259384634892428L;
+    private static final long serialVersionUID = 6881217592831423520L;
     
-    public PipelineJobHasAlreadyFinishedException(final String message) {
-        super(XOpenSQLState.GENERAL_ERROR, 88, message);
+    public PipelineJobHasAlreadyFinishedException(final String jobId) {
+        super(XOpenSQLState.GENERAL_ERROR, 95, "Job has already finished, please run `CHECK MIGRATION %s` to start a new data consistency check job", jobId);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index c4963d50ab6..4ad73f6ee91 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -45,6 +45,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.util.Collections;
@@ -131,32 +132,25 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     @Override
     public void startDisabledJob(final String jobId) {
-        log.info("start disable check job {}", jobId);
+        log.info("Start disable check job {}", jobId);
         PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
-        if (null != jobProgress && JobStatus.FINISHED == jobProgress.getStatus()) {
-            throw new PipelineJobHasAlreadyFinishedException(String.format("job already finished, can use `CHECK MIGRATION '%s'` to start a new data consistency check job", jobId));
-        }
-        super.startDisabledJob(jobId);
+        ShardingSpherePreconditions.checkState(null == jobProgress || JobStatus.FINISHED != jobProgress.getStatus(), () -> new PipelineJobHasAlreadyFinishedException(jobId));
     }
     
     @Override
     public void startByParentJobId(final String parentJobId) {
-        log.info("start check job by parentJobId {}", parentJobId);
-        Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
-        if (!optional.isPresent()) {
-            throw new PipelineJobNotFoundException(parentJobId + " check job");
-        }
-        startDisabledJob(optional.get());
+        log.info("Start check job by parent job id: {}", parentJobId);
+        Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+        startDisabledJob(checkLatestJobId.get());
     }
     
     @Override
     public void stopByParentJobId(final String parentJobId) {
-        log.info("stop check job by parentJobId {}", parentJobId);
-        Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
-        if (!optional.isPresent()) {
-            throw new PipelineJobNotFoundException(parentJobId + " check job");
-        }
-        stop(optional.get());
+        log.info("Stop check job by parent job id: {}", parentJobId);
+        Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+        stop(checkLatestJobId.get());
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index d4c892d8db3..1da9b9c08f7 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -44,10 +44,10 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
- * PostgreSQL general scaling test case. include openGauss type, same process.
+ * PostgreSQL and openGauss general scaling test case.
  */
-@Slf4j
 @RunWith(Parameterized.class)
+@Slf4j
 public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase {
     
     private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new AutoIncrementKeyGenerateAlgorithm();
@@ -117,15 +117,13 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         /*
          * TODO Compatible with restart job, before stopping job, incremental_idle_seconds=16, before checking migration, incremental_idle_seconds=23,
-         *   it just pass 7 seconds, and it's not enough for PostgreSQL incremental task to sync data
+         * it just pass 7 seconds, and it's not enough for PostgreSQL incremental task to sync data
          */
-/*
-        stopMigrationByJobId(jobId);
-        sourceExecuteWithLog(String.format("INSERT INTO %s.%s (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
-                1, "afterStop"));
-        startMigrationByJobId(jobId);
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-*/
+//        stopMigrationByJobId(jobId);
+//        sourceExecuteWithLog(String.format("INSERT INTO %s.%s (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
+//                1, "afterStop"));
+//        startMigrationByJobId(jobId);
+//        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }