You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/11 11:21:56 UTC
[shardingsphere] branch master updated: Improve consistency check job stop/cancel/progress (#21501)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0b169e60577 Improve consistency check job stop/cancel/progress (#21501)
0b169e60577 is described below
commit 0b169e605772ca827976d277b90cb2bb5064a1d5
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Oct 11 19:21:45 2022 +0800
Improve consistency check job stop/cancel/progress (#21501)
* Fix remaining seconds
* Handle other exception for data consistency check canceling
* Clean error message before checking
* Ignore exception persistence when check canceling
* Fix getJobProgressInfo when check job is stopped
* Unit test
---
.../consistency/DataConsistencyCalculateAlgorithm.java | 7 +++++++
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 2 ++
.../AbstractDataConsistencyCalculateAlgorithm.java | 7 +++++--
.../scenario/consistencycheck/ConsistencyCheckJob.java | 4 ++++
.../consistencycheck/ConsistencyCheckJobAPIImpl.java | 12 +++++++++---
.../consistencycheck/ConsistencyCheckTasksRunner.java | 17 +++++++++++++----
.../data/pipeline/scenario/migration/MigrationJob.java | 6 ++----
.../DataConsistencyCalculateAlgorithmFixture.java | 5 +++++
8 files changed, 47 insertions(+), 13 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
index 6ee50d96e10..64474fbc009 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
@@ -43,4 +43,11 @@ public interface DataConsistencyCalculateAlgorithm extends ShardingSphereAlgorit
* @throws SQLException SQL exception if cancel underlying SQL execution failure
*/
void cancel() throws SQLException;
+
+ /**
+ * Is calculation canceling.
+ *
+ * @return canceling or not
+ */
+ boolean isCanceling();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index f943ae5b661..6812e54efaf 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -124,6 +124,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().remove("stop_time");
+ jobConfigPOJO.getProps().remove("stop_time_millis");
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
String barrierPath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
@@ -137,6 +138,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
+ jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
index 7e14ced4342..4bb6c51e462 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
@@ -32,7 +31,7 @@ import java.sql.Statement;
@Slf4j
public abstract class AbstractDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
- @Getter(AccessLevel.PROTECTED)
+ @Getter
private volatile boolean canceling;
private volatile Statement currentStatement;
@@ -57,6 +56,10 @@ public abstract class AbstractDataConsistencyCalculateAlgorithm implements DataC
log.info("cancel is not supported: {}", ex.getMessage());
} catch (final SQLException ex) {
log.info("cancel failed: {}", ex.getMessage());
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.info("cancel failed: {}", ex.getMessage());
}
log.info("cancel cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index cce3ea67e4c..5cc1b0a5066 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -36,6 +36,8 @@ import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@Slf4j
public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
+ private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+
private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
@Override
@@ -54,6 +56,8 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
return;
}
+ log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
+ jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
tasksRunner.start();
PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, shardingContext.getShardingItem());
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index fa2b7d9ebd7..ee12d63fa97 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -58,6 +58,7 @@ import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
/**
* Consistency check job API impl.
@@ -192,9 +193,14 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
} else {
long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
result.setFinishedPercentage((int) (checkedRecordsCount * 100 / recordsCount));
- Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
- result.setDurationSeconds(duration.getSeconds());
- long remainingMills = (recordsCount - checkedRecordsCount) / recordsCount * duration.toMillis();
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(checkJobId);
+ Long stopTimeMillis = jobConfigPOJO.isDisabled() ? Long.parseLong(jobConfigPOJO.getProps().getProperty("stop_time_millis")) : null;
+ long durationMillis = (null != stopTimeMillis ? stopTimeMillis : System.currentTimeMillis()) - jobItemProgress.getCheckBeginTimeMillis();
+ result.setDurationSeconds(TimeUnit.MILLISECONDS.toSeconds(durationMillis));
+ if (null != stopTimeMillis) {
+ result.setCheckEndTime(DATE_TIME_FORMATTER.format(new Timestamp(stopTimeMillis).toLocalDateTime()));
+ }
+ long remainingMills = (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis);
result.setRemainingSeconds(remainingMills / 1000);
}
String tableNames = jobItemProgress.getTableNames();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index a13093312a4..b7aeb1b8796 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -17,7 +17,9 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
@@ -58,6 +60,9 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
private final ExecuteCallback checkExecuteCallback;
+ @Setter(AccessLevel.PRIVATE)
+ private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
+
public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
this.jobItemContext = jobItemContext;
checkJobConfig = jobItemContext.getJobConfig();
@@ -87,8 +92,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
private final class CheckLifecycleExecutor extends AbstractLifecycleExecutor {
- private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
-
@Override
protected void runBlocking() {
log.info("execute consistency check, check job id: {}, parent job id: {}", checkJobId, parentJobId);
@@ -98,7 +101,7 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(parentJobId);
DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
- this.calculateAlgorithm = calculateAlgorithm;
+ setCalculateAlgorithm(calculateAlgorithm);
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, jobItemContext);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
@@ -130,7 +133,13 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
@Override
public void onFailure(final Throwable throwable) {
- log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+ DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
+ if (null != algorithm && algorithm.isCanceling()) {
+ log.info("onFailure, canceling, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+ checkJobAPI.stop(checkJobId);
+ return;
+ }
+ log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId, throwable);
checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
checkJobAPI.persistJobItemProgress(jobItemContext);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 244dc46efcf..365a5b28fe6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -20,21 +20,19 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -75,7 +73,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
return;
}
log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
- PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+ jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
InventoryIncrementalTasksRunner tasksRunner = new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
runInBackground(() -> {
prepare(jobItemContext);
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
index 051ce468505..a0ce9457fb9 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
@@ -48,6 +48,11 @@ public final class DataConsistencyCalculateAlgorithmFixture implements DataConsi
public void cancel() {
}
+ @Override
+ public boolean isCanceling() {
+ return false;
+ }
+
@Override
public Collection<String> getSupportedDatabaseTypes() {
return DatabaseTypeFactory.getInstances().stream().map(DatabaseType::getType).collect(Collectors.toList());