You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/11 11:21:56 UTC

[shardingsphere] branch master updated: Improve consistency check job stop/cancel/progress (#21501)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b169e60577 Improve consistency check job stop/cancel/progress (#21501)
0b169e60577 is described below

commit 0b169e605772ca827976d277b90cb2bb5064a1d5
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Oct 11 19:21:45 2022 +0800

    Improve consistency check job stop/cancel/progress (#21501)
    
    * Fix remaining seconds
    
    * Handle other exception for data consistency check canceling
    
    * Clean error message before checking
    
    * Ignore exception persistence when check canceling
    
    * Fix getJobProgressInfo when check job is stopped
    
    * Unit test
---
 .../consistency/DataConsistencyCalculateAlgorithm.java  |  7 +++++++
 .../core/api/impl/AbstractPipelineJobAPIImpl.java       |  2 ++
 .../AbstractDataConsistencyCalculateAlgorithm.java      |  7 +++++--
 .../scenario/consistencycheck/ConsistencyCheckJob.java  |  4 ++++
 .../consistencycheck/ConsistencyCheckJobAPIImpl.java    | 12 +++++++++---
 .../consistencycheck/ConsistencyCheckTasksRunner.java   | 17 +++++++++++++----
 .../data/pipeline/scenario/migration/MigrationJob.java  |  6 ++----
 .../DataConsistencyCalculateAlgorithmFixture.java       |  5 +++++
 8 files changed, 47 insertions(+), 13 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
index 6ee50d96e10..64474fbc009 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
@@ -43,4 +43,11 @@ public interface DataConsistencyCalculateAlgorithm extends ShardingSphereAlgorit
      * @throws SQLException SQL exception if cancel underlying SQL execution failure
      */
     void cancel() throws SQLException;
+    
+    /**
+     * Is calculation canceling.
+     *
+     * @return canceling or not
+     */
+    boolean isCanceling();
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index f943ae5b661..6812e54efaf 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -124,6 +124,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().remove("stop_time");
+        jobConfigPOJO.getProps().remove("stop_time_millis");
         PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
         String barrierPath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
@@ -137,6 +138,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
+        jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
         PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
         String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
         pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
index 7e14ced4342..4bb6c51e462 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
-import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
@@ -32,7 +31,7 @@ import java.sql.Statement;
 @Slf4j
 public abstract class AbstractDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
     
-    @Getter(AccessLevel.PROTECTED)
+    @Getter
     private volatile boolean canceling;
     
     private volatile Statement currentStatement;
@@ -57,6 +56,10 @@ public abstract class AbstractDataConsistencyCalculateAlgorithm implements DataC
             log.info("cancel is not supported: {}", ex.getMessage());
         } catch (final SQLException ex) {
             log.info("cancel failed: {}", ex.getMessage());
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            log.info("cancel failed: {}", ex.getMessage());
         }
         log.info("cancel cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index cce3ea67e4c..5cc1b0a5066 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -36,6 +36,8 @@ import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 @Slf4j
 public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
     
+    private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+    
     private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
     
     @Override
@@ -54,6 +56,8 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
             log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
             return;
         }
+        log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
+        jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
         ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
         tasksRunner.start();
         PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, shardingContext.getShardingItem());
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index fa2b7d9ebd7..ee12d63fa97 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -58,6 +58,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Consistency check job API impl.
@@ -192,9 +193,14 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
         } else {
             long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
             result.setFinishedPercentage((int) (checkedRecordsCount * 100 / recordsCount));
-            Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
-            result.setDurationSeconds(duration.getSeconds());
-            long remainingMills = (recordsCount - checkedRecordsCount) / recordsCount * duration.toMillis();
+            JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(checkJobId);
+            Long stopTimeMillis = jobConfigPOJO.isDisabled() ? Long.parseLong(jobConfigPOJO.getProps().getProperty("stop_time_millis")) : null;
+            long durationMillis = (null != stopTimeMillis ? stopTimeMillis : System.currentTimeMillis()) - jobItemProgress.getCheckBeginTimeMillis();
+            result.setDurationSeconds(TimeUnit.MILLISECONDS.toSeconds(durationMillis));
+            if (null != stopTimeMillis) {
+                result.setCheckEndTime(DATE_TIME_FORMATTER.format(new Timestamp(stopTimeMillis).toLocalDateTime()));
+            }
+            long remainingMills = (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis);
             result.setRemainingSeconds(remainingMills / 1000);
         }
         String tableNames = jobItemProgress.getTableNames();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index a13093312a4..b7aeb1b8796 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -17,7 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
+import lombok.AccessLevel;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
@@ -58,6 +60,9 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
     
     private final ExecuteCallback checkExecuteCallback;
     
+    @Setter(AccessLevel.PRIVATE)
+    private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
+    
     public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
         this.jobItemContext = jobItemContext;
         checkJobConfig = jobItemContext.getJobConfig();
@@ -87,8 +92,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
     
     private final class CheckLifecycleExecutor extends AbstractLifecycleExecutor {
         
-        private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
-        
         @Override
         protected void runBlocking() {
             log.info("execute consistency check, check job id: {}, parent job id: {}", checkJobId, parentJobId);
@@ -98,7 +101,7 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
             PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(parentJobId);
             DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
                     parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
-            this.calculateAlgorithm = calculateAlgorithm;
+            setCalculateAlgorithm(calculateAlgorithm);
             Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, jobItemContext);
             PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
             jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
@@ -130,7 +133,13 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
         
         @Override
         public void onFailure(final Throwable throwable) {
-            log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+            DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
+            if (null != algorithm && algorithm.isCanceling()) {
+                log.info("onFailure, canceling, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+                checkJobAPI.stop(checkJobId);
+                return;
+            }
+            log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId, throwable);
             checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
             jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
             checkJobAPI.persistJobItemProgress(jobItemContext);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 244dc46efcf..365a5b28fe6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -20,21 +20,19 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
@@ -75,7 +73,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
             return;
         }
         log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
-        PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+        jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
         InventoryIncrementalTasksRunner tasksRunner = new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
         runInBackground(() -> {
             prepare(jobItemContext);
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
index 051ce468505..a0ce9457fb9 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
@@ -48,6 +48,11 @@ public final class DataConsistencyCalculateAlgorithmFixture implements DataConsi
     public void cancel() {
     }
     
+    @Override
+    public boolean isCanceling() {
+        return false;
+    }
+    
     @Override
     public Collection<String> getSupportedDatabaseTypes() {
         return DatabaseTypeFactory.getInstances().stream().map(DatabaseType::getType).collect(Collectors.toList());