You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2022/11/30 08:12:59 UTC
[shardingsphere] branch master updated: Support drop migration consitency check job (#22527)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 453bb11f792 Support drop migration consitency check job (#22527)
453bb11f792 is described below
commit 453bb11f7921def26d42872b0647e1000135b42c
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Nov 30 16:12:47 2022 +0800
Support drop migration consitency check job (#22527)
* Add DROP MIGRATION CHECK jobId DistSQL and impl
* Refactor consistency check sequence
* Add dropByParentJobId unit test
* Fix code style by review
---
.../distsql/syntax/ral/_index.cn.md | 1 +
.../distsql/syntax/ral/_index.en.md | 1 +
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 7 +++
.../pipeline/core/api/GovernanceRepositoryAPI.java | 8 +++
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 5 ++
.../handler/update/DropMigrationCheckUpdater.java | 41 ++++++++++++
.../main/antlr4/imports/migration/RALStatement.g4 | 4 ++
.../parser/autogen/MigrationDistSQLStatement.g4 | 1 +
.../core/MigrationDistSQLStatementVisitor.java | 7 +++
.../statement/DropMigrationCheckStatement.java} | 33 ++++------
.../ConsistencyCheckJobAPIImpl.java | 40 ++++++++----
.../consistencycheck/ConsistencyCheckJobId.java | 8 +++
.../consistencycheck/ConsistencyCheckSequence.java | 72 ++++++++++++++++++++++
.../update/DropMigrationCheckStatementAssert.java | 48 +++++++++++++++
.../cases/parser/jaxb/RootSQLParserTestCases.java | 4 ++
.../DropMigrationCheckStatementTestCase.java | 36 +++++------
.../src/main/resources/case/ral/migration.xml | 4 ++
.../main/resources/sql/supported/ral/migration.xml | 1 +
.../api/impl/ConsistencyCheckJobAPIImplTest.java | 43 ++++++++++++-
.../pipeline/core/util/PipelineContextUtil.java | 2 +-
.../ConsistencyCheckSequenceTest.java | 56 +++++++++++++++++
21 files changed, 369 insertions(+), 53 deletions(-)
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.cn.md b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.cn.md
index 53bc2f35274..a70af3a227f 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.cn.md
@@ -33,6 +33,7 @@ RAL (Resource & Rule Administration Language) 为 Apache ShardingSphere 的管
| SHOW MIGRATION CHECK STATUS jobId | 查询数据一致性校验进度 | SHOW MIGRATION CHECK STATUS 1234 |
| STOP MIGRATION CHECK jobId | 停止数据一致性校验 | STOP MIGRATION CHECK 1234 |
| START MIGRATION CHECK jobId | 开启数据一致性校验 | START MIGRATION CHECK 1234 |
+| DROP MIGRATION CHECK jobId | 删除数据一致性校验作业 | DROP MIGRATION CHECK 1234 |
| ROLLBACK MIGRATION jobId | 撤销作业。注意:该语句会清理目标端表,请谨慎操作 | ROLLBACK MIGRATION 1234 |
| COMMIT MIGRATION jobId | 完成作业 | COMMIT MIGRATION 1234 |
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.en.md b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.en.md
index 969e3695d1a..6ebaa4d6b41 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.en.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/_index.en.md
@@ -33,6 +33,7 @@ RAL (Resource & Rule Administration Language) responsible for hint, circuit brea
| SHOW MIGRATION CHECK STATUS jobId | Query data consistency check status | SHOW MIGRATION CHECK STATUS 1234 |
| STOP MIGRATION CHECK jobId | Stop data consistency check | STOP MIGRATION CHECK 1234 |
| START MIGRATION CHECK jobId | Start data consistency check | START MIGRATION CHECK 1234 |
+| DROP MIGRATION CHECK jobId | Drop data consistency check | DROP MIGRATION CHECK 1234 |
| ROLLBACK MIGRATION jobId | Rollback migration | ROLLBACK MIGRATION 1234 |
| COMMIT MIGRATION jobId | Commit migration | COMMIT MIGRATION 1234 |
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index d34b34a4717..1d8dd967437 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -61,6 +61,13 @@ public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, Requ
*/
void stopByParentJobId(String parentJobId);
+ /**
+ * Drop by parent job id.
+ *
+ * @param parentJobId parent job id
+ */
+ void dropByParentJobId(String parentJobId);
+
/**
* Get consistency job item info.
*
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 17874b2e8bb..103213ba6e8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -63,6 +63,7 @@ public interface GovernanceRepositoryAPI {
* @param jobId job id
* @return check job id
*/
+ // TODO rename method name
Optional<String> getCheckLatestJobId(String jobId);
/**
@@ -73,6 +74,13 @@ public interface GovernanceRepositoryAPI {
*/
void persistCheckLatestJobId(String jobId, String checkJobId);
+ /**
+ * Delete check latest job id.
+ *
+ * @param jobId job id
+ */
+ void deleteCheckLatestJobId(String jobId);
+
/**
* Get check job result.
*
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 0e27980841d..b30236452d7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -75,6 +75,11 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
repository.persist(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), String.valueOf(checkJobId));
}
+ @Override
+ public void deleteCheckLatestJobId(final String jobId) {
+ repository.delete(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId));
+ }
+
@SuppressWarnings("unchecked")
@Override
public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String parentJobId, final String checkJobId) {
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationCheckUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationCheckUpdater.java
new file mode 100644
index 00000000000..5a677ee87cb
--- /dev/null
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationCheckUpdater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.migration.distsql.handler.update;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckStatement;
+
+/**
+ * Drop migration check updater.
+ */
+public final class DropMigrationCheckUpdater implements RALUpdater<DropMigrationCheckStatement> {
+
+ private final ConsistencyCheckJobPublicAPI jobAPI = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
+
+ @Override
+ public void executeUpdate(final String databaseName, final DropMigrationCheckStatement sqlStatement) {
+ jobAPI.dropByParentJobId(sqlStatement.getJobId());
+ }
+
+ @Override
+ public String getType() {
+ return DropMigrationCheckStatement.class.getName();
+ }
+}
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/migration/RALStatement.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/migration/RALStatement.g4
index 9f7b2023f66..74a5c36d3b7 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/migration/RALStatement.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/migration/RALStatement.g4
@@ -63,6 +63,10 @@ startMigrationCheck
: START MIGRATION CHECK jobId
;
+dropMigrationCheck
+ : DROP MIGRATION CHECK jobId
+ ;
+
showMigrationCheckAlgorithms
: SHOW MIGRATION CHECK ALGORITHMS
;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
index b121ce16b6c..cb252f2728c 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
@@ -35,5 +35,6 @@ execute
| showMigrationCheckStatus
| startMigrationCheck
| stopMigrationCheck
+ | dropMigrationCheck
) SEMI?
;
diff --git a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index 25258241a20..910f1074e19 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++ b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatemen
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlgorithmDefinitionContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CheckMigrationContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CommitMigrationContext;
+import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.DropMigrationCheckContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.MigrateTableContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PasswordContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertiesDefinitionContext;
@@ -47,6 +48,7 @@ import org.apache.shardingsphere.distsql.parser.segment.HostnameAndPortBasedData
import org.apache.shardingsphere.distsql.parser.segment.URLBasedDataSourceSegment;
import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckStatement;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
import org.apache.shardingsphere.migration.distsql.statement.RegisterMigrationSourceStorageUnitStatement;
import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
@@ -200,4 +202,9 @@ public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStat
public ASTNode visitStopMigrationCheck(final StopMigrationCheckContext ctx) {
return new StopMigrationCheckStatement(getIdentifierValue(ctx.jobId()));
}
+
+ @Override
+ public ASTNode visitDropMigrationCheck(final DropMigrationCheckContext ctx) {
+ return new DropMigrationCheckStatement(getIdentifierValue(ctx.jobId()));
+ }
}
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/DropMigrationCheckStatement.java
similarity index 60%
copy from kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/DropMigrationCheckStatement.java
index b121ce16b6c..1e0f4dc79fc 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/DropMigrationCheckStatement.java
@@ -15,25 +15,18 @@
* limitations under the License.
*/
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.migration.distsql.statement;
-import Symbol, RALStatement, RQLStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
-execute
- : (showMigrationList
- | showMigrationStatus
- | migrateTable
- | startMigration
- | stopMigration
- | rollbackMigration
- | commitMigration
- | checkMigration
- | showMigrationCheckAlgorithms
- | registerMigrationSourceStorageUnit
- | unregisterMigrationSourceStorageUnit
- | showMigrationSourceStorageUnits
- | showMigrationCheckStatus
- | startMigrationCheck
- | stopMigrationCheck
- ) SEMI?
- ;
+/**
+ * Drop migration check statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DropMigrationCheckStatement extends UpdatableScalingRALStatement {
+
+ private final String jobId;
+}
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 5fe76f9beb2..34851b7940d 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -54,10 +54,12 @@ import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* Consistency check job API impl.
@@ -87,8 +89,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
throw new UncompletedConsistencyCheckJobExistsException(checkLatestJobId.get());
}
}
- int sequence = checkLatestJobId.map(optional -> ConsistencyCheckJobId.parseSequence(optional) + 1).orElse(ConsistencyCheckJobId.MIN_SEQUENCE);
- String result = marshalJobId(new ConsistencyCheckJobId(parentJobId, sequence));
+ String result = marshalJobId(checkLatestJobId.map(s -> new ConsistencyCheckJobId(parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(parentJobId)));
repositoryAPI.persistCheckLatestJobId(parentJobId, result);
repositoryAPI.deleteCheckJobResult(parentJobId, result);
dropJob(result);
@@ -139,7 +140,6 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public void startDisabledJob(final String jobId) {
- log.info("Start disable check job {}", jobId);
PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
if (null != jobProgress && JobStatus.FINISHED == jobProgress.getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
@@ -150,18 +150,36 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public void startByParentJobId(final String parentJobId) {
- log.info("Start check job by parent job id: {}", parentJobId);
- Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
- ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
- startDisabledJob(checkLatestJobId.get());
+ startDisabledJob(getCheckLatestJobId(parentJobId));
+ }
+
+ private String getCheckLatestJobId(final String parentJobId) {
+ Optional<String> result = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(result.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
+ return result.get();
}
@Override
public void stopByParentJobId(final String parentJobId) {
- log.info("Stop check job by parent job id: {}", parentJobId);
- Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
- ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
- stop(checkLatestJobId.get());
+ stop(getCheckLatestJobId(parentJobId));
+ }
+
+ @Override
+ public void dropByParentJobId(final String parentJobId) {
+ String latestCheckJobId = getCheckLatestJobId(parentJobId);
+ stop(latestCheckJobId);
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+ Collection<String> checkJobIds = repositoryAPI.listCheckJobIds(parentJobId);
+ Optional<Integer> previousSequence = ConsistencyCheckSequence.getPreviousSequence(
+ checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId));
+ if (previousSequence.isPresent()) {
+ String checkJobId = marshalJobId(new ConsistencyCheckJobId(parentJobId, previousSequence.get()));
+ repositoryAPI.persistCheckLatestJobId(parentJobId, checkJobId);
+ } else {
+ repositoryAPI.deleteCheckLatestJobId(parentJobId);
+ }
+ repositoryAPI.deleteCheckJobResult(parentJobId, latestCheckJobId);
+ dropJob(latestCheckJobId);
}
@Override
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index e57f6db643e..63d0c44d7d4 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -39,6 +39,14 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
private final int sequence;
+ public ConsistencyCheckJobId(final String parentJobId) {
+ this(parentJobId, ConsistencyCheckSequence.MIN_SEQUENCE);
+ }
+
+ public ConsistencyCheckJobId(final String parentJobId, final String latestCheckJobId) {
+ this(parentJobId, ConsistencyCheckSequence.getNextSequence(parseSequence(latestCheckJobId)));
+ }
+
public ConsistencyCheckJobId(final String parentJobId, final int sequence) {
super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
this.parentJobId = parentJobId;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequence.java b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequence.java
new file mode 100644
index 00000000000..3a491d8ce40
--- /dev/null
+++ b/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequence.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Consistency check sequence.
+ */
+@Getter
+@ToString(callSuper = true)
+public final class ConsistencyCheckSequence {
+
+ public static final int MIN_SEQUENCE = 1;
+
+ private static final int MAX_SEQUENCE = 3;
+
+ /**
+ * Get next sequence.
+ *
+ * @param currentSequence current sequence
+ * @return next sequence
+ */
+ public static int getNextSequence(final int currentSequence) {
+ int nextSequence = currentSequence + 1;
+ return nextSequence > MAX_SEQUENCE ? MIN_SEQUENCE : nextSequence;
+ }
+
+ /**
+ * Get previous sequence.
+ *
+ * @param sequences sequence list
+ * @param currentSequence current sequence
+ * @return previous sequence
+ */
+ public static Optional<Integer> getPreviousSequence(final List<Integer> sequences, final int currentSequence) {
+ if (sequences.size() <= 1) {
+ return Optional.empty();
+ }
+ sequences.sort(Integer::compareTo);
+ Integer index = null;
+ for (int i = 0; i < sequences.size(); i++) {
+ if (sequences.get(i) == currentSequence) {
+ index = i;
+ break;
+ }
+ }
+ if (null == index) {
+ return Optional.empty();
+ }
+ return Optional.of(index >= 1 ? sequences.get(index - 1) : MAX_SEQUENCE);
+ }
+}
diff --git a/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/asserts/statement/ral/impl/migration/update/DropMigrationCheckStatementAssert.java b/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/asserts/statement/ral/impl/migration/update/DropMigrationCheckStatementAssert.java
new file mode 100644
index 00000000000..ec6e7b4f801
--- /dev/null
+++ b/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/asserts/statement/ral/impl/migration/update/DropMigrationCheckStatementAssert.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.sql.parser.internal.asserts.statement.ral.impl.migration.update;
+
+import org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckStatement;
+import org.apache.shardingsphere.test.sql.parser.internal.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.sql.parser.internal.asserts.statement.ral.impl.migration.JobIdAssert;
+import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.DropMigrationCheckStatementTestCase;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Drop migration check statement assert.
+ */
+public final class DropMigrationCheckStatementAssert {
+
+ /**
+ * Assert drop migration check statement is correct with expected parser result.
+ *
+ * @param assertContext assert context
+ * @param actual actual drop migration statement
+ * @param expected expected drop migration statement test case
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext, final DropMigrationCheckStatement actual, final DropMigrationCheckStatementTestCase expected) {
+ if (null == expected) {
+ assertNull(assertContext.getText("Actual statement should not exist."), actual);
+ } else {
+ assertNotNull(assertContext.getText("Actual statement should exist."), actual);
+ JobIdAssert.assertJobId(assertContext, actual.getJobId(), expected.getJobId());
+ }
+ }
+}
diff --git a/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java b/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 9b5337fcc01..7ba2c2db768 100644
--- a/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ b/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -317,6 +317,7 @@ import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.stat
import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.DropMigrationCheckStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.MigrateTableStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.RegisterMigrationSourceStorageUnitStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.RollbackMigrationStatementTestCase;
@@ -1008,6 +1009,9 @@ public final class RootSQLParserTestCases {
@XmlElement(name = "start-migration-check")
private final List<StartMigrationCheckStatementTestCase> startMigrationCheckTestCases = new LinkedList<>();
+ @XmlElement(name = "drop-migration-check")
+ private final List<DropMigrationCheckStatementTestCase> dropMigrationCheckTestCases = new LinkedList<>();
+
@XmlElement(name = "migrate-table")
private final List<MigrateTableStatementTestCase> migrateTableTestCases = new LinkedList<>();
diff --git a/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/cases/parser/jaxb/statement/ral/migration/DropMigrationCheckStatementTestCase.java
similarity index 60%
copy from kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/cases/parser/jaxb/statement/ral/migration/DropMigrationCheckStatementTestCase.java
index b121ce16b6c..489e614a185 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/test/parser/src/main/java/org/apache/shardingsphere/test/sql/parser/internal/cases/parser/jaxb/statement/ral/migration/DropMigrationCheckStatementTestCase.java
@@ -15,25 +15,21 @@
* limitations under the License.
*/
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.statement.ral.migration;
-import Symbol, RALStatement, RQLStatement;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.test.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-execute
- : (showMigrationList
- | showMigrationStatus
- | migrateTable
- | startMigration
- | stopMigration
- | rollbackMigration
- | commitMigration
- | checkMigration
- | showMigrationCheckAlgorithms
- | registerMigrationSourceStorageUnit
- | unregisterMigrationSourceStorageUnit
- | showMigrationSourceStorageUnits
- | showMigrationCheckStatus
- | startMigrationCheck
- | stopMigrationCheck
- ) SEMI?
- ;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Drop migration check statement test case.
+ */
+@Getter
+@Setter
+public final class DropMigrationCheckStatementTestCase extends SQLParserTestCase {
+
+ @XmlElement(name = "job-id")
+ private String jobId;
+}
diff --git a/test/parser/src/main/resources/case/ral/migration.xml b/test/parser/src/main/resources/case/ral/migration.xml
index a72d8ff7914..c96ecec4d04 100644
--- a/test/parser/src/main/resources/case/ral/migration.xml
+++ b/test/parser/src/main/resources/case/ral/migration.xml
@@ -92,6 +92,10 @@
<job-id>123</job-id>
</stop-migration-check>
+ <drop-migration-check sql-case-id="drop-migration-check">
+ <job-id>123</job-id>
+ </drop-migration-check>
+
<unregister-migration-source-storage-unit sql-case-id="unregister-migration-source-storage-unit">
<data-source>ds_0</data-source>
<data-source>ds_1</data-source>
diff --git a/test/parser/src/main/resources/sql/supported/ral/migration.xml b/test/parser/src/main/resources/sql/supported/ral/migration.xml
index 9c4e979934d..44cfcefee53 100644
--- a/test/parser/src/main/resources/sql/supported/ral/migration.xml
+++ b/test/parser/src/main/resources/sql/supported/ral/migration.xml
@@ -35,5 +35,6 @@
<sql-case id="start-migration" value="START MIGRATION 123;" db-types="ShardingSphere" />
<sql-case id="stop-migration-check" value="STOP MIGRATION CHECK 123;" db-types="ShardingSphere" />
<sql-case id="start-migration-check" value="START MIGRATION CHECK 123;" db-types="ShardingSphere" />
+ <sql-case id="drop-migration-check" value="DROP MIGRATION CHECK 123;" db-types="ShardingSphere" />
<sql-case id="unregister-migration-source-storage-unit" value="UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0,ds_1" db-types="ShardingSphere" />
</sql-cases>
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
index 56612d25beb..0199094685f 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -21,13 +21,18 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckSequence;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.junit.BeforeClass;
@@ -39,6 +44,7 @@ import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -60,7 +66,7 @@ public final class ConsistencyCheckJobAPIImplTest {
String migrationJobId = "j0101test";
String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(migrationJobId, null, null));
ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
- int expectedSequence = ConsistencyCheckJobId.MIN_SEQUENCE;
+ int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = "j0201" + migrationJobId + expectedSequence;
assertThat(jobConfig.getJobId(), is(expectCheckJobId));
assertNull(jobConfig.getAlgorithmTypeName());
@@ -81,4 +87,39 @@ public final class ConsistencyCheckJobAPIImplTest {
assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectedCheckResult.get("t_order").getContentCheckResult().isMatched()));
}
+
+ @Test
+ public void assertDropByParentJobId() {
+ String parentJobId = getParentJobId(JobConfigurationBuilder.createJobConfiguration());
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+ int expectedSequence = 1;
+ for (int i = 0; i < 3; i++) {
+ String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null));
+ ConsistencyCheckJobItemContext checkJobItemContext = new ConsistencyCheckJobItemContext(
+ new ConsistencyCheckJobConfiguration(checkJobId, parentJobId, null, null), 0, JobStatus.FINISHED, null);
+ checkJobAPI.persistJobItemProgress(checkJobItemContext);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = Collections.singletonMap("t_order",
+ new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(0, 0), new DataConsistencyContentCheckResult(true)));
+ repositoryAPI.persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+ Optional<String> latestCheckJobId = repositoryAPI.getCheckLatestJobId(parentJobId);
+ assertTrue(latestCheckJobId.isPresent());
+ assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), is(expectedSequence++));
+ }
+ expectedSequence = 2;
+ for (int i = 0; i < 2; i++) {
+ checkJobAPI.dropByParentJobId(parentJobId);
+ Optional<String> latestCheckJobId = repositoryAPI.getCheckLatestJobId(parentJobId);
+ assertTrue(latestCheckJobId.isPresent());
+ assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), is(expectedSequence--));
+ }
+ checkJobAPI.dropByParentJobId(parentJobId);
+ Optional<String> latestCheckJobId = repositoryAPI.getCheckLatestJobId(parentJobId);
+ assertFalse(latestCheckJobId.isPresent());
+ }
+
+ private String getParentJobId(final MigrationJobConfiguration jobConfig) {
+ Optional<String> result = migrationJobAPI.start(jobConfig);
+ assertTrue(result.isPresent());
+ return result.get();
+ }
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 66c28c5d3ce..33c3e0f9c9e 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -69,7 +69,7 @@ public final class PipelineContextUtil {
private static final LazyInitializer<ClusterPersistRepository> PERSIST_REPOSITORY_LAZY_INITIALIZER;
static {
- PERSIST_REPOSITORY_CONFIG = new ClusterPersistRepositoryConfiguration("Zookeeper", "test", EmbedTestingServer.getConnectionString(), new Properties());
+ PERSIST_REPOSITORY_CONFIG = new ClusterPersistRepositoryConfiguration("ZooKeeper", "test", EmbedTestingServer.getConnectionString(), new Properties());
PERSIST_REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
@Override
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequenceTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequenceTest.java
new file mode 100644
index 00000000000..0be433f7240
--- /dev/null
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequenceTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public final class ConsistencyCheckSequenceTest {
+
+ @Test
+ public void assertGetNextSequence() {
+ int currentSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
+ assertThat(currentSequence = ConsistencyCheckSequence.getNextSequence(currentSequence), is(2));
+ assertThat(currentSequence = ConsistencyCheckSequence.getNextSequence(currentSequence), is(3));
+ assertThat(currentSequence = ConsistencyCheckSequence.getNextSequence(currentSequence), is(1));
+ }
+
+ @Test
+ public void assertGetPreviousSequence() {
+ List<Integer> sequences = Arrays.asList(2, 3, 1);
+ Optional<Integer> previousSequence = ConsistencyCheckSequence.getPreviousSequence(sequences, 3);
+ assertTrue(previousSequence.isPresent());
+ assertThat(previousSequence.get(), is(2));
+ previousSequence = ConsistencyCheckSequence.getPreviousSequence(sequences, 2);
+ assertTrue(previousSequence.isPresent());
+ assertThat(previousSequence.get(), is(1));
+ previousSequence = ConsistencyCheckSequence.getPreviousSequence(sequences, 1);
+ assertTrue(previousSequence.isPresent());
+ assertThat(previousSequence.get(), is(3));
+ previousSequence = ConsistencyCheckSequence.getPreviousSequence(sequences, 4);
+ assertFalse(previousSequence.isPresent());
+ }
+}