You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/02/23 06:50:45 UTC

[shardingsphere] branch master updated: FinishedCheckJob ignore jobs which status is not EXECUTE_INCREMENTAL_TASK (#15565)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a5938a4  FinishedCheckJob ignore jobs which status is not EXECUTE_INCREMENTAL_TASK (#15565)
a5938a4 is described below

commit a5938a4729dc66165910e231ce3c5e2d015117ff
Author: ReyYang <yi...@163.com>
AuthorDate: Wed Feb 23 14:49:37 2022 +0800

    FinishedCheckJob ignore jobs which status is not EXECUTE_INCREMENTAL_TASK (#15565)
---
 .../data/pipeline/core/job/FinishedCheckJob.java        | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 851c7ee..e6fc10d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -23,6 +23,8 @@ import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
@@ -49,6 +51,9 @@ public final class FinishedCheckJob implements SimpleJob {
                 continue;
             }
             String jobId = jobInfo.getJobId();
+            if (isNotAllowDataCheck(jobId)) {
+                continue;
+            }
             try {
                 // TODO refactor: dispatch to different job types
                 JobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), JobConfiguration.class, true);
@@ -93,6 +98,18 @@ public final class FinishedCheckJob implements SimpleJob {
         }
     }
     
+    private boolean isNotAllowDataCheck(final String jobId) {
+        Map<Integer, JobProgress> jobProgressMap = ruleAlteredJobAPI.getProgress(jobId);
+        boolean flag = false;
+        for (JobProgress each : jobProgressMap.values()) {
+            if (null == each || !JobStatus.EXECUTE_INCREMENTAL_TASK.equals(each.getStatus())) {
+                flag = true;
+                break;
+            }
+        }
+        return flag;
+    }
+    
     private boolean dataConsistencyCheck(final JobConfiguration jobConfig) {
         String jobId = jobConfig.getHandleConfig().getJobId();
         log.info("dataConsistencyCheck for job {}", jobId);