You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/11 13:07:11 UTC

[shardingsphere] branch master updated: Decouple RuleAlteredJobAPI and SingletonSPI (#16743)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c76308d29c9 Decouple RuleAlteredJobAPI and SingletonSPI (#16743)
c76308d29c9 is described below

commit c76308d29c92c2745a8678ba0b61fd84ef04091c
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Apr 11 21:06:59 2022 +0800

    Decouple RuleAlteredJobAPI and SingletonSPI (#16743)
---
 .../handler/query/CheckScalingQueryResultSet.java  |  2 +-
 .../ShowScalingCheckAlgorithmsQueryResultSet.java  |  2 +-
 .../query/ShowScalingJobStatusQueryResultSet.java  |  2 +-
 .../query/ShowScalingListQueryResultSet.java       |  2 +-
 .../handler/update/ApplyScalingUpdater.java        |  2 +-
 .../distsql/handler/update/DropScalingUpdater.java |  2 +-
 .../handler/update/ResetScalingUpdater.java        |  2 +-
 .../update/RestoreScalingSourceWritingUpdater.java |  2 +-
 .../handler/update/StartScalingUpdater.java        |  2 +-
 .../update/StopScalingSourceWritingUpdater.java    |  2 +-
 .../distsql/handler/update/StopScalingUpdater.java |  2 +-
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  9 ++++---
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 28 +++++++++++-----------
 .../exception/PipelineVerifyFailedException.java   | 20 +---------------
 .../data/pipeline/core/job/FinishedCheckJob.java   |  2 +-
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  6 ++---
 .../spi/DefaultSourceWritingStopAlgorithm.java     |  2 +-
 .../data/pipeline/api/PipelineJobAPIFactory.java   | 10 ++++----
 .../data/pipeline/api/RuleAlteredJobAPI.java       |  3 +--
 .../api/impl/RuleAlteredJobAPIImplTest.java        |  2 +-
 .../pipeline/core/job/FinishedCheckJobTest.java    |  8 +++----
 21 files changed, 47 insertions(+), 65 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
index c654977d640..5858521f6bd 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
  */
 public final class CheckScalingQueryResultSet implements DistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     private Iterator<Collection<Object>> data;
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java
index 375b538a3e7..0d9069abcd6 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
  */
 public final class ShowScalingCheckAlgorithmsQueryResultSet implements DistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     private Iterator<Collection<Object>> data;
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
index 9e16ac70a56..24b7af8a5ce 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
  */
 public final class ShowScalingJobStatusQueryResultSet implements DistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     private Iterator<Collection<Object>> data;
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java
index a50f49ff9ce..a96bb5bca51 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
  */
 public final class ShowScalingListQueryResultSet implements DistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     private Iterator<Collection<Object>> data;
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java
index 9b3d09e6893..32d66cb2d66 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.ApplyScalingStatement
  */
 public final class ApplyScalingUpdater implements RALUpdater<ApplyScalingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void executeUpdate(final ApplyScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java
index 7e4bdc1bbea..672b5653126 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.DropScalingStatement;
  */
 public final class DropScalingUpdater implements RALUpdater<DropScalingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void executeUpdate(final DropScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java
index 14afe802de1..762f3673a12 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.ResetScalingStatement
  */
 public final class ResetScalingUpdater implements RALUpdater<ResetScalingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void executeUpdate(final ResetScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java
index 42e79dafad4..7c7278290ec 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.RestoreScalingSourceW
  */
 public final class RestoreScalingSourceWritingUpdater implements RALUpdater<RestoreScalingSourceWritingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void executeUpdate(final RestoreScalingSourceWritingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java
index a847b2c6dda..2a4d1eec11b 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.StartScalingStatement
  */
 public final class StartScalingUpdater implements RALUpdater<StartScalingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void executeUpdate(final StartScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java
index 1535bde7403..dea19157d16 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.StopScalingSourceWrit
  */
 public final class StopScalingSourceWritingUpdater implements RALUpdater<StopScalingSourceWritingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void executeUpdate(final StopScalingSourceWritingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java
index e0198434f2c..87387eeb7f3 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.StopScalingStatement;
  */
 public final class StopScalingUpdater implements RALUpdater<StopScalingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void executeUpdate(final StopScalingStatement sqlStatement) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 27453fa4330..1867c5bcb98 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -27,6 +27,9 @@ import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 
+/**
+ * Abstract pipeline job API impl.
+ */
 @Slf4j
 public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     
@@ -65,7 +68,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
     }
     
-    protected JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
+    protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
         JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
         if (null == result) {
             throw new PipelineJobNotFoundException(String.format("Can not find scaling job %s", jobId), jobId);
@@ -73,13 +76,13 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         return result;
     }
     
-    protected void verifyJobNotStopped(final JobConfigurationPOJO jobConfigPOJO) {
+    protected final void verifyJobNotStopped(final JobConfigurationPOJO jobConfigPOJO) {
         if (jobConfigPOJO.isDisabled()) {
             throw new PipelineVerifyFailedException("Job is stopped, it's not necessary to do it.");
         }
     }
     
-    protected void verifyJobStopped(final JobConfigurationPOJO jobConfigPOJO) {
+    protected final void verifyJobStopped(final JobConfigurationPOJO jobConfigPOJO) {
         if (!jobConfigPOJO.isDisabled()) {
             throw new PipelineVerifyFailedException("Job is not stopped. You could run `STOP SCALING {jobId}` to stop it.");
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 6e8b776838a..d47fdcd435e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -72,6 +72,9 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+/**
+ * Rule altered job API impl.
+ */
 @Slf4j
 public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl implements RuleAlteredJobAPI {
     
@@ -140,8 +143,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     @Override
     public Map<Integer, JobProgress> getProgress(final String jobId) {
         checkModeConfig();
-        JobConfiguration jobConfig = getJobConfig(jobId);
-        return getProgress(jobConfig);
+        return getProgress(getJobConfig(jobId));
     }
     
     @Override
@@ -265,18 +267,12 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         return null != ruleAlteredContext.getDataConsistencyCheckAlgorithm();
     }
     
-    private void verifyDataConsistencyCheck(final JobConfigurationPOJO jobConfigPOJO, final JobConfiguration jobConfig) {
-        verifyManualMode(jobConfig);
-        verifySourceWritingStopped(jobConfig);
-    }
-    
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
         checkModeConfig();
         log.info("Data consistency check for job {}", jobId);
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
-        verifyDataConsistencyCheck(jobConfigPOJO, jobConfig);
+        JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+        verifyDataConsistencyCheck(jobConfig);
         return dataConsistencyCheck(jobConfig);
     }
     
@@ -294,14 +290,18 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType) {
         checkModeConfig();
         log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
-        verifyDataConsistencyCheck(jobConfigPOJO, jobConfig);
+        JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+        verifyDataConsistencyCheck(jobConfig);
         TypedSPIConfiguration typedSPIConfig = new ShardingSphereAlgorithmConfiguration(algorithmType, new Properties());
         DataConsistencyCheckAlgorithm checkAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(typedSPIConfig, DataConsistencyCheckAlgorithm.class);
         return dataConsistencyCheck0(jobConfig, checkAlgorithm);
     }
     
+    private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
+        verifyManualMode(jobConfig);
+        verifySourceWritingStopped(jobConfig);
+    }
+    
     private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(final JobConfiguration jobConfig, final DataConsistencyCheckAlgorithm checkAlgorithm) {
         String jobId = jobConfig.getHandleConfig().getJobId();
         DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobConfig);
@@ -364,7 +364,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         stop(jobId);
         // TODO clean up should be done after the task is complete.
         try {
-            TimeUnit.SECONDS.sleep(1);
+            TimeUnit.SECONDS.sleep(1L);
         } catch (final InterruptedException ex) {
             log.error(ex.getMessage());
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java
index c584e8da0b6..71ba8236da6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java
@@ -17,32 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception;
 
-import lombok.Getter;
-
 /**
  * Pipeline verify failed exception.
  */
-@Getter
 public final class PipelineVerifyFailedException extends RuntimeException {
     
-    private static final long serialVersionUID = 1L;
-    
-    public PipelineVerifyFailedException() {
-    }
+    private static final long serialVersionUID = 2854259384634892428L;
     
     public PipelineVerifyFailedException(final String message) {
         super(message);
     }
-    
-    public PipelineVerifyFailedException(final String message, final Throwable cause) {
-        super(message, cause);
-    }
-    
-    public PipelineVerifyFailedException(final Throwable cause) {
-        super(cause);
-    }
-    
-    public PipelineVerifyFailedException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index e6fc10dc884..a6ca54c3a42 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -40,7 +40,7 @@ import java.util.Map;
 @Slf4j
 public final class FinishedCheckJob implements SimpleJob {
     
-    private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
     
     // TODO only one proxy node could do data consistency check in proxy cluster
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index baf115eab29..95f9c300439 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -176,7 +176,7 @@ public final class RuleAlteredJobWorker {
         }
         Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
         if (jobConfigOptional.isPresent()) {
-            PipelineJobAPIFactory.getRuleAlteredJobAPI().start(jobConfigOptional.get());
+            PipelineJobAPIFactory.newInstance().start(jobConfigOptional.get());
         } else {
             log.info("Switch rule configuration immediately.");
             ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getSchemaName(), event.getActiveVersion(), event.getNewVersion());
@@ -291,8 +291,8 @@ public final class RuleAlteredJobWorker {
     
     private boolean isUncompletedJobOfSameSchemaInJobList(final String schema) {
         boolean isUncompletedJobOfSameSchema = false;
-        for (JobInfo each : PipelineJobAPIFactory.getRuleAlteredJobAPI().list()) {
-            if (PipelineJobAPIFactory.getRuleAlteredJobAPI().getProgress(each.getJobId()).values().stream()
+        for (JobInfo each : PipelineJobAPIFactory.newInstance().list()) {
+            if (PipelineJobAPIFactory.newInstance().getProgress(each.getJobId()).values().stream()
                     .allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
                 continue;
             }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java
index 98d4e3773a3..7a9bff71e23 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLockAlgorithm
 @Slf4j
 public final class DefaultSourceWritingStopAlgorithm implements RowBasedJobLockAlgorithm {
     
-    private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+    private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
     
     @Override
     public void lock(final String schemaName, final String jobId) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java
index 30f4d2a8407..0f1ea5eadad 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java
@@ -29,14 +29,12 @@ public final class PipelineJobAPIFactory {
         ShardingSphereServiceLoader.register(RuleAlteredJobAPI.class);
     }
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = RequiredSPIRegistry.getRegisteredService(RuleAlteredJobAPI.class);
-    
     /**
-     * Get {@linkplain RuleAlteredJobAPI}.
+     * Create new instance of pipeline job API.
      *
-     * @return pipeline job API
+     * @return new instance of pipeline job API
      */
-    public static RuleAlteredJobAPI getRuleAlteredJobAPI() {
-        return RULE_ALTERED_JOB_API;
+    public static RuleAlteredJobAPI newInstance() {
+        return RequiredSPIRegistry.getRegisteredService(RuleAlteredJobAPI.class);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 0a56f0591f4..8a31e1ff43b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.spi.type.required.RequiredSPI;
-import org.apache.shardingsphere.spi.type.singleton.SingletonSPI;
 
 import java.util.Collection;
 import java.util.List;
@@ -33,7 +32,7 @@ import java.util.Optional;
 /**
  * Rule altered job API.
  */
-public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI, SingletonSPI {
+public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
     
     /**
      * List all jobs.
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java
index cb52cbafede..4e311534f1a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java
@@ -58,7 +58,7 @@ public final class RuleAlteredJobAPIImplTest {
     @BeforeClass
     public static void beforeClass() {
         PipelineContextUtil.mockModeConfigAndContextManager();
-        ruleAlteredJobAPI = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+        ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 96560b99007..05899ad74f5 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -59,9 +59,9 @@ public final class FinishedCheckJobTest {
     
     @Test
     public void assertExecuteAllDisabledJob() {
-        Optional<String> jobId = PipelineJobAPIFactory.getRuleAlteredJobAPI().start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = PipelineJobAPIFactory.newInstance().start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        List<JobInfo> jobInfos = PipelineJobAPIFactory.getRuleAlteredJobAPI().list();
+        List<JobInfo> jobInfos = PipelineJobAPIFactory.newInstance().list();
         jobInfos.forEach(each -> each.setActive(false));
         when(ruleAlteredJobAPI.list()).thenReturn(jobInfos);
         finishedCheckJob.execute(null);
@@ -69,9 +69,9 @@ public final class FinishedCheckJobTest {
     
     @Test
     public void assertExecuteActiveJob() {
-        Optional<String> jobId = PipelineJobAPIFactory.getRuleAlteredJobAPI().start(JobConfigurationBuilder.createJobConfiguration());
+        Optional<String> jobId = PipelineJobAPIFactory.newInstance().start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        List<JobInfo> jobInfos = PipelineJobAPIFactory.getRuleAlteredJobAPI().list();
+        List<JobInfo> jobInfos = PipelineJobAPIFactory.newInstance().list();
         jobInfos.forEach(each -> each.setActive(true));
         when(ruleAlteredJobAPI.list()).thenReturn(jobInfos);
         finishedCheckJob.execute(null);