You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/11 13:07:11 UTC
[shardingsphere] branch master updated: Decouple RuleAlteredJobAPI and SingletonSPI (#16743)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c76308d29c9 Decouple RuleAlteredJobAPI and SingletonSPI (#16743)
c76308d29c9 is described below
commit c76308d29c92c2745a8678ba0b61fd84ef04091c
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Apr 11 21:06:59 2022 +0800
Decouple RuleAlteredJobAPI and SingletonSPI (#16743)
---
.../handler/query/CheckScalingQueryResultSet.java | 2 +-
.../ShowScalingCheckAlgorithmsQueryResultSet.java | 2 +-
.../query/ShowScalingJobStatusQueryResultSet.java | 2 +-
.../query/ShowScalingListQueryResultSet.java | 2 +-
.../handler/update/ApplyScalingUpdater.java | 2 +-
.../distsql/handler/update/DropScalingUpdater.java | 2 +-
.../handler/update/ResetScalingUpdater.java | 2 +-
.../update/RestoreScalingSourceWritingUpdater.java | 2 +-
.../handler/update/StartScalingUpdater.java | 2 +-
.../update/StopScalingSourceWritingUpdater.java | 2 +-
.../distsql/handler/update/StopScalingUpdater.java | 2 +-
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 9 ++++---
.../core/api/impl/RuleAlteredJobAPIImpl.java | 28 +++++++++++-----------
.../exception/PipelineVerifyFailedException.java | 20 +---------------
.../data/pipeline/core/job/FinishedCheckJob.java | 2 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 6 ++---
.../spi/DefaultSourceWritingStopAlgorithm.java | 2 +-
.../data/pipeline/api/PipelineJobAPIFactory.java | 10 ++++----
.../data/pipeline/api/RuleAlteredJobAPI.java | 3 +--
.../api/impl/RuleAlteredJobAPIImplTest.java | 2 +-
.../pipeline/core/job/FinishedCheckJobTest.java | 8 +++----
21 files changed, 47 insertions(+), 65 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
index c654977d640..5858521f6bd 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
*/
public final class CheckScalingQueryResultSet implements DistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
private Iterator<Collection<Object>> data;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java
index 375b538a3e7..0d9069abcd6 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingCheckAlgorithmsQueryResultSet.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
*/
public final class ShowScalingCheckAlgorithmsQueryResultSet implements DistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
private Iterator<Collection<Object>> data;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
index 9e16ac70a56..24b7af8a5ce 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
*/
public final class ShowScalingJobStatusQueryResultSet implements DistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
private Iterator<Collection<Object>> data;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java
index a50f49ff9ce..a96bb5bca51 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingListQueryResultSet.java
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
*/
public final class ShowScalingListQueryResultSet implements DistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
private Iterator<Collection<Object>> data;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java
index 9b3d09e6893..32d66cb2d66 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ApplyScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.ApplyScalingStatement
*/
public final class ApplyScalingUpdater implements RALUpdater<ApplyScalingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
@Override
public void executeUpdate(final ApplyScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java
index 7e4bdc1bbea..672b5653126 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/DropScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.DropScalingStatement;
*/
public final class DropScalingUpdater implements RALUpdater<DropScalingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
@Override
public void executeUpdate(final DropScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java
index 14afe802de1..762f3673a12 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/ResetScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.ResetScalingStatement
*/
public final class ResetScalingUpdater implements RALUpdater<ResetScalingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
@Override
public void executeUpdate(final ResetScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java
index 42e79dafad4..7c7278290ec 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/RestoreScalingSourceWritingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.RestoreScalingSourceW
*/
public final class RestoreScalingSourceWritingUpdater implements RALUpdater<RestoreScalingSourceWritingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
@Override
public void executeUpdate(final RestoreScalingSourceWritingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java
index a847b2c6dda..2a4d1eec11b 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StartScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.StartScalingStatement
*/
public final class StartScalingUpdater implements RALUpdater<StartScalingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
@Override
public void executeUpdate(final StartScalingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java
index 1535bde7403..dea19157d16 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingSourceWritingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.StopScalingSourceWrit
*/
public final class StopScalingSourceWritingUpdater implements RALUpdater<StopScalingSourceWritingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
@Override
public void executeUpdate(final StopScalingSourceWritingStatement sqlStatement) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java
index e0198434f2c..87387eeb7f3 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/update/StopScalingUpdater.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.distsql.statement.StopScalingStatement;
*/
public final class StopScalingUpdater implements RALUpdater<StopScalingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = PipelineJobAPIFactory.newInstance();
@Override
public void executeUpdate(final StopScalingStatement sqlStatement) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 27453fa4330..1867c5bcb98 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -27,6 +27,9 @@ import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+/**
+ * Abstract pipeline job API impl.
+ */
@Slf4j
public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
@@ -65,7 +68,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
}
- protected JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
+ protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
if (null == result) {
throw new PipelineJobNotFoundException(String.format("Can not find scaling job %s", jobId), jobId);
@@ -73,13 +76,13 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
return result;
}
- protected void verifyJobNotStopped(final JobConfigurationPOJO jobConfigPOJO) {
+ protected final void verifyJobNotStopped(final JobConfigurationPOJO jobConfigPOJO) {
if (jobConfigPOJO.isDisabled()) {
throw new PipelineVerifyFailedException("Job is stopped, it's not necessary to do it.");
}
}
- protected void verifyJobStopped(final JobConfigurationPOJO jobConfigPOJO) {
+ protected final void verifyJobStopped(final JobConfigurationPOJO jobConfigPOJO) {
if (!jobConfigPOJO.isDisabled()) {
throw new PipelineVerifyFailedException("Job is not stopped. You could run `STOP SCALING {jobId}` to stop it.");
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 6e8b776838a..d47fdcd435e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -72,6 +72,9 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+/**
+ * Rule altered job API impl.
+ */
@Slf4j
public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl implements RuleAlteredJobAPI {
@@ -140,8 +143,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
@Override
public Map<Integer, JobProgress> getProgress(final String jobId) {
checkModeConfig();
- JobConfiguration jobConfig = getJobConfig(jobId);
- return getProgress(jobConfig);
+ return getProgress(getJobConfig(jobId));
}
@Override
@@ -265,18 +267,12 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
return null != ruleAlteredContext.getDataConsistencyCheckAlgorithm();
}
- private void verifyDataConsistencyCheck(final JobConfigurationPOJO jobConfigPOJO, final JobConfiguration jobConfig) {
- verifyManualMode(jobConfig);
- verifySourceWritingStopped(jobConfig);
- }
-
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
checkModeConfig();
log.info("Data consistency check for job {}", jobId);
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
- verifyDataConsistencyCheck(jobConfigPOJO, jobConfig);
+ JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+ verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig);
}
@@ -294,14 +290,18 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType) {
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
- verifyDataConsistencyCheck(jobConfigPOJO, jobConfig);
+ JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+ verifyDataConsistencyCheck(jobConfig);
TypedSPIConfiguration typedSPIConfig = new ShardingSphereAlgorithmConfiguration(algorithmType, new Properties());
DataConsistencyCheckAlgorithm checkAlgorithm = ShardingSphereAlgorithmFactory.createAlgorithm(typedSPIConfig, DataConsistencyCheckAlgorithm.class);
return dataConsistencyCheck0(jobConfig, checkAlgorithm);
}
+ private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
+ verifyManualMode(jobConfig);
+ verifySourceWritingStopped(jobConfig);
+ }
+
private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(final JobConfiguration jobConfig, final DataConsistencyCheckAlgorithm checkAlgorithm) {
String jobId = jobConfig.getHandleConfig().getJobId();
DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobConfig);
@@ -364,7 +364,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
stop(jobId);
// TODO clean up should be done after the task is complete.
try {
- TimeUnit.SECONDS.sleep(1);
+ TimeUnit.SECONDS.sleep(1L);
} catch (final InterruptedException ex) {
log.error(ex.getMessage());
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java
index c584e8da0b6..71ba8236da6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineVerifyFailedException.java
@@ -17,32 +17,14 @@
package org.apache.shardingsphere.data.pipeline.core.exception;
-import lombok.Getter;
-
/**
* Pipeline verify failed exception.
*/
-@Getter
public final class PipelineVerifyFailedException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-
- public PipelineVerifyFailedException() {
- }
+ private static final long serialVersionUID = 2854259384634892428L;
public PipelineVerifyFailedException(final String message) {
super(message);
}
-
- public PipelineVerifyFailedException(final String message, final Throwable cause) {
- super(message, cause);
- }
-
- public PipelineVerifyFailedException(final Throwable cause) {
- super(cause);
- }
-
- public PipelineVerifyFailedException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index e6fc10dc884..a6ca54c3a42 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -40,7 +40,7 @@ import java.util.Map;
@Slf4j
public final class FinishedCheckJob implements SimpleJob {
- private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
// TODO only one proxy node could do data consistency check in proxy cluster
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index baf115eab29..95f9c300439 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -176,7 +176,7 @@ public final class RuleAlteredJobWorker {
}
Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
if (jobConfigOptional.isPresent()) {
- PipelineJobAPIFactory.getRuleAlteredJobAPI().start(jobConfigOptional.get());
+ PipelineJobAPIFactory.newInstance().start(jobConfigOptional.get());
} else {
log.info("Switch rule configuration immediately.");
ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getSchemaName(), event.getActiveVersion(), event.getNewVersion());
@@ -291,8 +291,8 @@ public final class RuleAlteredJobWorker {
private boolean isUncompletedJobOfSameSchemaInJobList(final String schema) {
boolean isUncompletedJobOfSameSchema = false;
- for (JobInfo each : PipelineJobAPIFactory.getRuleAlteredJobAPI().list()) {
- if (PipelineJobAPIFactory.getRuleAlteredJobAPI().getProgress(each.getJobId()).values().stream()
+ for (JobInfo each : PipelineJobAPIFactory.newInstance().list()) {
+ if (PipelineJobAPIFactory.newInstance().getProgress(each.getJobId()).values().stream()
.allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
continue;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java
index 98d4e3773a3..7a9bff71e23 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/DefaultSourceWritingStopAlgorithm.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLockAlgorithm
@Slf4j
public final class DefaultSourceWritingStopAlgorithm implements RowBasedJobLockAlgorithm {
- private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ private final RuleAlteredJobAPI ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
@Override
public void lock(final String schemaName, final String jobId) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java
index 30f4d2a8407..0f1ea5eadad 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPIFactory.java
@@ -29,14 +29,12 @@ public final class PipelineJobAPIFactory {
ShardingSphereServiceLoader.register(RuleAlteredJobAPI.class);
}
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = RequiredSPIRegistry.getRegisteredService(RuleAlteredJobAPI.class);
-
/**
- * Get {@linkplain RuleAlteredJobAPI}.
+ * Create new instance of pipeline job API.
*
- * @return pipeline job API
+ * @return new instance of pipeline job API
*/
- public static RuleAlteredJobAPI getRuleAlteredJobAPI() {
- return RULE_ALTERED_JOB_API;
+ public static RuleAlteredJobAPI newInstance() {
+ return RequiredSPIRegistry.getRegisteredService(RuleAlteredJobAPI.class);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 0a56f0591f4..8a31e1ff43b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.spi.type.required.RequiredSPI;
-import org.apache.shardingsphere.spi.type.singleton.SingletonSPI;
import java.util.Collection;
import java.util.List;
@@ -33,7 +32,7 @@ import java.util.Optional;
/**
* Rule altered job API.
*/
-public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI, SingletonSPI {
+public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
/**
* List all jobs.
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java
index cb52cbafede..4e311534f1a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/RuleAlteredJobAPIImplTest.java
@@ -58,7 +58,7 @@ public final class RuleAlteredJobAPIImplTest {
@BeforeClass
public static void beforeClass() {
PipelineContextUtil.mockModeConfigAndContextManager();
- ruleAlteredJobAPI = PipelineJobAPIFactory.getRuleAlteredJobAPI();
+ ruleAlteredJobAPI = PipelineJobAPIFactory.newInstance();
}
@Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 96560b99007..05899ad74f5 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -59,9 +59,9 @@ public final class FinishedCheckJobTest {
@Test
public void assertExecuteAllDisabledJob() {
- Optional<String> jobId = PipelineJobAPIFactory.getRuleAlteredJobAPI().start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = PipelineJobAPIFactory.newInstance().start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- List<JobInfo> jobInfos = PipelineJobAPIFactory.getRuleAlteredJobAPI().list();
+ List<JobInfo> jobInfos = PipelineJobAPIFactory.newInstance().list();
jobInfos.forEach(each -> each.setActive(false));
when(ruleAlteredJobAPI.list()).thenReturn(jobInfos);
finishedCheckJob.execute(null);
@@ -69,9 +69,9 @@ public final class FinishedCheckJobTest {
@Test
public void assertExecuteActiveJob() {
- Optional<String> jobId = PipelineJobAPIFactory.getRuleAlteredJobAPI().start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId = PipelineJobAPIFactory.newInstance().start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- List<JobInfo> jobInfos = PipelineJobAPIFactory.getRuleAlteredJobAPI().list();
+ List<JobInfo> jobInfos = PipelineJobAPIFactory.newInstance().list();
jobInfos.forEach(each -> each.setActive(true));
when(ruleAlteredJobAPI.list()).thenReturn(jobInfos);
finishedCheckJob.execute(null);