You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/27 05:21:39 UTC
[shardingsphere] branch master updated: Refactor check migration, change to use async job (#21194)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 098c08d153d Refactor check migration, change to use async job (#21194)
098c08d153d is described below
commit 098c08d153db09fdd080b212a9425839fc0eee46
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Sep 27 13:21:29 2022 +0800
Refactor check migration, change to use async job (#21194)
* Refactor check migration, change to use async job
* Remove unused prop
* Fix merge compiler error
* Fix codestyle
* Change param description
* Update class name
* Improve
---
...=> ShowMigrationCheckStatusQueryResultSet.java} | 44 +++---
.../handler/update/CheckMigrationJobUpdater.java | 49 ++++++
...dingsphere.infra.distsql.query.DistSQLResultSet | 2 +-
....shardingsphere.infra.distsql.update.RALUpdater | 1 +
.../main/antlr4/imports/migration/RALStatement.g4 | 4 +
.../parser/autogen/MigrationDistSQLStatement.g4 | 1 +
.../core/MigrationDistSQLStatementVisitor.java | 7 +
.../distsql/statement/CheckMigrationStatement.java | 4 +-
...java => ShowMigrationCheckStatusStatement.java} | 7 +-
.../ShowMigrationSourceResourcesStatement.java | 2 +-
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 48 ++++++
.../pipeline/api/PipelineJobPublicAPIFactory.java | 10 ++
.../yaml/YamlDataConsistencyCheckResult.java | 62 ++++++++
.../YamlDataConsistencyCheckResultSwapper.java | 69 +++++++++
.../job/ConsistencyCheckJobConfiguration.java | 35 ++++-
.../yaml/YamlConsistencyCheckJobConfiguration.java | 31 ++--
...amlConsistencyCheckJobConfigurationSwapper.java | 59 +++++++
.../job/progress/ConsistencyCheckJobProgress.java | 14 +-
.../pojo/CreateConsistencyCheckJobParameter.java | 18 ++-
.../pipeline/core/api/GovernanceRepositoryAPI.java | 35 ++---
.../AbstractInventoryIncrementalJobAPIImpl.java | 2 -
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 59 ++++---
.../job/PipelineJobExecutionException.java | 2 +-
... => PipelineJobHasAlreadyExistedException.java} | 10 +-
.../yaml/YamlConsistencyCheckJobProgress.java | 12 +-
.../YamlConsistencyCheckJobProgressSwapper.java | 42 +++++
.../core/metadata/node/PipelineMetaDataNode.java | 19 +--
...tencyCheckChangedJobConfigurationProcessor.java | 81 ++++++++++
.../consistencycheck/ConsistencyCheckJob.java | 88 +++++++++++
.../consistencycheck/ConsistencyCheckJobAPI.java | 11 +-
.../ConsistencyCheckJobAPIFactory.java} | 23 ++-
.../ConsistencyCheckJobAPIImpl.java | 172 +++++++++++++++++++++
.../consistencycheck/ConsistencyCheckJobId.java | 26 +++-
.../ConsistencyCheckJobItemContext.java | 59 +++++++
...data.pipeline.api.ConsistencyCheckJobPublicAPI} | 2 +-
...ingsphere.data.pipeline.core.api.PipelineJobAPI | 1 +
...andler.PipelineChangedJobConfigurationProcessor | 1 +
...enario.consistencycheck.ConsistencyCheckJobAPI} | 2 +-
.../metadata/node/PipelineMetaDataNodeTest.java | 8 +-
.../cases/migration/AbstractMigrationITCase.java | 17 +-
.../QueryableScalingRALStatementAssert.java | 10 +-
.../UpdatableScalingRALStatementAssert.java | 21 ++-
...> ShowMigrationCheckStatusStatementAssert.java} | 26 +---
.../CheckMigrationStatementAssert.java | 2 +-
.../jaxb/cases/domain/SQLParserTestCases.java | 12 +-
.../ShowMigrationCheckStatusStatementTestCase.java | 20 +--
.../src/main/resources/case/ral/migration.xml | 4 +
.../main/resources/sql/supported/ral/migration.xml | 1 +
.../data/pipeline/api/PipelineAPIFactoryTest.java | 6 +
.../api/PipelineJobPublicAPIFactoryTest.java | 6 +
.../api/impl/GovernanceRepositoryAPIImplTest.java | 17 +-
.../api/impl/ConsistencyCheckJobAPIImplTest.java | 55 +++++++
.../core/api/impl/MigrationJobAPIImplTest.java | 6 +-
.../ConsistencyCheckJobAPIFactoryTest.java} | 11 +-
54 files changed, 1124 insertions(+), 212 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
similarity index 53%
rename from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
rename to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index e448711660d..5c90cfbb7b1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -17,48 +17,44 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
-import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Map.Entry;
/**
- * Query result set for check migration.
+ * Show migration check status query result set.
*/
-public final class CheckMigrationQueryResultSet implements DatabaseDistSQLResultSet {
+public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDistSQLResultSet {
- private static final MigrationJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
+ private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
private Iterator<Collection<Object>> data;
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
- CheckMigrationStatement checkMigrationStatement = (CheckMigrationStatement) sqlStatement;
- AlgorithmSegment typeStrategy = checkMigrationStatement.getTypeStrategy();
- Map<String, DataConsistencyCheckResult> checkResultMap = null == typeStrategy
- ? JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId())
- : JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId(), typeStrategy.getName(), typeStrategy.getProps());
- data = checkResultMap.entrySet().stream()
- .map(each -> {
- Collection<Object> result = new LinkedList<>();
- result.add(each.getKey());
- result.add(each.getValue().getCountCheckResult().getSourceRecordsCount());
- result.add(each.getValue().getCountCheckResult().getTargetRecordsCount());
- result.add(each.getValue().getCountCheckResult().isMatched() + "");
- result.add(each.getValue().getContentCheckResult().isMatched() + "");
- return result;
- }).collect(Collectors.toList()).iterator();
+ ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement;
+ Map<String, DataConsistencyCheckResult> consistencyCheckResult = JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
+ List<Collection<Object>> result = new ArrayList<>(consistencyCheckResult.size());
+ for (Entry<String, DataConsistencyCheckResult> entry : consistencyCheckResult.entrySet()) {
+ DataConsistencyCheckResult value = entry.getValue();
+ DataConsistencyCountCheckResult countCheckResult = value.getCountCheckResult();
+ result.add(Arrays.asList(entry.getKey(), countCheckResult.getSourceRecordsCount(), countCheckResult.getTargetRecordsCount(), String.valueOf(countCheckResult.isMatched()),
+ String.valueOf(value.getContentCheckResult().isMatched())));
+ }
+ data = result.iterator();
}
@Override
@@ -78,6 +74,6 @@ public final class CheckMigrationQueryResultSet implements DatabaseDistSQLResult
@Override
public String getType() {
- return CheckMigrationStatement.class.getName();
+ return ShowMigrationCheckStatusStatement.class.getName();
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
new file mode 100644
index 00000000000..275b158f8a9
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.migration.distsql.handler.update;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
+import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Check migration job updater.
+ */
+public final class CheckMigrationJobUpdater implements RALUpdater<CheckMigrationStatement> {
+
+ private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
+
+ @Override
+ public void executeUpdate(final String databaseName, final CheckMigrationStatement sqlStatement) throws SQLException {
+ AlgorithmSegment typeStrategy = sqlStatement.getTypeStrategy();
+ String algorithmTypeName = null != typeStrategy ? typeStrategy.getName() : null;
+ Properties algorithmProps = null != typeStrategy ? typeStrategy.getProps() : null;
+ JOB_API.createJobAndStart(new CreateConsistencyCheckJobParameter(sqlStatement.getJobId(), algorithmTypeName, algorithmProps));
+ }
+
+ @Override
+ public String getType() {
+ return CheckMigrationStatement.class.getName();
+ }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet
index 51f4bb8bc3a..9f5e9244628 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet
@@ -23,7 +23,7 @@ org.apache.shardingsphere.sharding.distsql.handler.query.ShardingAuditorsQueryRe
org.apache.shardingsphere.sharding.distsql.handler.query.ShardingTableNodesQueryResultSet
org.apache.shardingsphere.sharding.distsql.handler.query.ShardingKeyGeneratorsQueryResultSet
org.apache.shardingsphere.sharding.distsql.handler.query.DefaultShardingStrategyQueryResultSet
-org.apache.shardingsphere.migration.distsql.handler.query.CheckMigrationQueryResultSet
+org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckStatusQueryResultSet
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationListQueryResultSet
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationJobStatusQueryResultSet
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsQueryResultSet
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
index c7d37afa4af..0c026e4a415 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
@@ -22,3 +22,4 @@ org.apache.shardingsphere.migration.distsql.handler.update.CommitMigrationUpdate
org.apache.shardingsphere.migration.distsql.handler.update.RollbackMigrationUpdater
org.apache.shardingsphere.migration.distsql.handler.update.AddMigrationSourceResourceUpdater
org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationSourceResourceUpdater
+org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpdater
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
index 787564c0679..e595c00fcc4 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
@@ -51,6 +51,10 @@ checkMigration
: CHECK MIGRATION jobId (BY algorithmDefinition)?
;
+showMigrationCheckStatus
+ : SHOW MIGRATION CHECK STATUS jobId
+ ;
+
showMigrationCheckAlgorithms
: SHOW MIGRATION CHECK ALGORITHMS
;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
index 2fb0c257080..40237cd1699 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
@@ -32,5 +32,6 @@ execute
| addMigrationSourceResource
| dropMigrationSourceResource
| showMigrationSourceResources
+ | showMigrationCheckStatus
) SEMI?
;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index ba5be0ce157..b7c41e605e1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatemen
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ResourceDefinitionContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RollbackMigrationContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckAlgorithmsContext;
+import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckStatusContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceResourcesContext;
import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
@@ -48,6 +49,7 @@ import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStat
import org.apache.shardingsphere.migration.distsql.statement.DropMigrationSourceResourceStatement;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceResourcesStatement;
@@ -190,4 +192,9 @@ public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStat
public ASTNode visitShowMigrationSourceResources(final ShowMigrationSourceResourcesContext ctx) {
return new ShowMigrationSourceResourcesStatement();
}
+
+ @Override
+ public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
+ return new ShowMigrationCheckStatusStatement(getIdentifierValue(ctx.jobId()));
+ }
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
index d2bbeaa772f..b8aa727568d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
@@ -20,14 +20,14 @@ package org.apache.shardingsphere.migration.distsql.statement;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
/**
* Check migration statement.
*/
@RequiredArgsConstructor
@Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+public final class CheckMigrationStatement extends UpdatableScalingRALStatement {
private final String jobId;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
similarity index 82%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
index d2bbeaa772f..fe8a36c0e9d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
@@ -19,17 +19,14 @@ package org.apache.shardingsphere.migration.distsql.statement;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
/**
- * Check migration statement.
+ * Show check migration status statement.
*/
@RequiredArgsConstructor
@Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+public final class ShowMigrationCheckStatusStatement extends QueryableScalingRALStatement {
private final String jobId;
-
- private final AlgorithmSegment typeStrategy;
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
index 47227004a30..4eb277b6d95 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
@@ -22,5 +22,5 @@ import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableS
/**
* Show migration source resources statement.
*/
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+public final class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
new file mode 100644
index 00000000000..cb6ed9ac55d
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api;
+
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+
+import java.util.Map;
+
+/**
+ * Consistency check job public API.
+ */
+@SingletonSPI
+public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI {
+
+ /**
+ * Create job migration config and start.
+ *
+ * @param parameter create consistency check job parameter
+ * @return job id
+ */
+ String createJobAndStart(CreateConsistencyCheckJobParameter parameter);
+
+ /**
+ * Get latest data consistency check result.
+ *
+ * @param jobId job id
+ * @return latest data consistency check result
+ */
+ Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(String jobId);
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index a6ab527ac32..725630e86cc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -30,6 +30,7 @@ public final class PipelineJobPublicAPIFactory {
static {
ShardingSphereServiceLoader.register(InventoryIncrementalJobPublicAPI.class);
ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
+ ShardingSphereServiceLoader.register(ConsistencyCheckJobPublicAPI.class);
}
/**
@@ -50,4 +51,13 @@ public final class PipelineJobPublicAPIFactory {
public static MigrationJobPublicAPI getMigrationJobPublicAPI() {
return RequiredSPIRegistry.getRegisteredService(MigrationJobPublicAPI.class);
}
+
+ /**
+ * Get instance of consistency check job public API.
+ *
+ * @return got instance
+ */
+ public static ConsistencyCheckJobPublicAPI getConsistencyCheckJobPublicAPI() {
+ return RequiredSPIRegistry.getRegisteredService(ConsistencyCheckJobPublicAPI.class);
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResult.java
new file mode 100644
index 00000000000..661f2d58102
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+/**
+ * Yaml data consistency check result config.
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public final class YamlDataConsistencyCheckResult implements YamlConfiguration {
+
+ private YamlDataConsistencyCountCheckResult countCheckResult;
+
+ private YamlDataConsistencyContentCheckResult contentCheckResult;
+
+ /**
+ * Yaml data consistency count result.
+ */
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class YamlDataConsistencyCountCheckResult {
+
+ private long sourceRecordsCount;
+
+ private long targetRecordsCount;
+
+ private boolean matched;
+ }
+
+ /**
+ * Yaml data consistency content result.
+ */
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class YamlDataConsistencyContentCheckResult {
+
+ private boolean matched;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultSwapper.java
new file mode 100644
index 00000000000..aadf937336c
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultSwapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResult.YamlDataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResult.YamlDataConsistencyCountCheckResult;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * Yaml data consistency check result swapper.
+ */
+public final class YamlDataConsistencyCheckResultSwapper implements YamlConfigurationSwapper<YamlDataConsistencyCheckResult, DataConsistencyCheckResult> {
+
+ private static final YamlDataConsistencyCheckResultSwapper CONFIG_SWAPPER = new YamlDataConsistencyCheckResultSwapper();
+
+ @Override
+ public YamlDataConsistencyCheckResult swapToYamlConfiguration(final DataConsistencyCheckResult data) {
+ YamlDataConsistencyCountCheckResult countCheckResult = new YamlDataConsistencyCountCheckResult();
+ countCheckResult.setSourceRecordsCount(data.getCountCheckResult().getSourceRecordsCount());
+ countCheckResult.setTargetRecordsCount(data.getCountCheckResult().getTargetRecordsCount());
+ countCheckResult.setMatched(data.getContentCheckResult().isMatched());
+ YamlDataConsistencyCheckResult result = new YamlDataConsistencyCheckResult();
+ result.setCountCheckResult(countCheckResult);
+ YamlDataConsistencyContentCheckResult contentCheckResult = new YamlDataConsistencyContentCheckResult(data.getContentCheckResult().isMatched());
+ result.setContentCheckResult(contentCheckResult);
+ return result;
+ }
+
+ @Override
+ public DataConsistencyCheckResult swapToObject(final YamlDataConsistencyCheckResult yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
+ YamlDataConsistencyCountCheckResult yamlCountCheck = yamlConfig.getCountCheckResult();
+ DataConsistencyCountCheckResult countCheckResult = new DataConsistencyCountCheckResult(yamlCountCheck.getSourceRecordsCount(), yamlCountCheck.getTargetRecordsCount());
+ DataConsistencyContentCheckResult contentCheckResult = new DataConsistencyContentCheckResult(yamlConfig.getContentCheckResult().isMatched());
+ return new DataConsistencyCheckResult(countCheckResult, contentCheckResult);
+ }
+
+ /**
+ * Swap string to data consistency check result.
+ *
+ * @param parameter parameter
+ * @return data consistency check result
+ */
+ public static DataConsistencyCheckResult swapToObject(final String parameter) {
+ YamlDataConsistencyCheckResult yamlDataCheckResult = YamlEngine.unmarshal(parameter, YamlDataConsistencyCheckResult.class, true);
+ return CONFIG_SWAPPER.swapToObject(yamlDataCheckResult);
+ }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
similarity index 56%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
index d2bbeaa772f..3102c2bbb9d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
@@ -15,21 +15,44 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.config.job;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Properties;
/**
- * Check migration statement.
+ * Consistency check job configuration.
*/
@RequiredArgsConstructor
@Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+@Slf4j
+@ToString
+public final class ConsistencyCheckJobConfiguration implements PipelineJobConfiguration {
private final String jobId;
- private final AlgorithmSegment typeStrategy;
+ private final String referredJobId;
+
+ private final String algorithmTypeName;
+
+ private final Properties algorithmProperties;
+
+ @Override
+ public String getSourceDatabaseType() {
+ throw new UnsupportedOperationException("");
+ }
+
+ /**
+ * Get job sharding count.
+ *
+ * @return job sharding count
+ */
+ @Override
+ public int getJobShardingCount() {
+ return 1;
+ }
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
similarity index 57%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
index d2bbeaa772f..53ec03e9c42 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
@@ -15,21 +15,34 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Properties;
/**
- * Check migration statement.
+ * Consistency check job configuration for YAML.
*/
-@RequiredArgsConstructor
@Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+@Setter
+@Slf4j
+@ToString
+public final class YamlConsistencyCheckJobConfiguration implements YamlPipelineJobConfiguration {
+
+ private String jobId;
+
+ private String parentJobId;
+
+ private String algorithmTypeName;
- private final String jobId;
+ private Properties algorithmProperties;
- private final AlgorithmSegment typeStrategy;
+ @Override
+ public String getTargetDatabaseName() {
+ throw new UnsupportedOperationException("");
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
new file mode 100644
index 00000000000..e7e07cdb678
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML consistency check job configuration swapper.
+ */
+public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobConfiguration, ConsistencyCheckJobConfiguration> {
+
+ private static final YamlConsistencyCheckJobConfigurationSwapper JOB_CONFIG_SWAPPER = new YamlConsistencyCheckJobConfigurationSwapper();
+
+ @Override
+ public YamlConsistencyCheckJobConfiguration swapToYamlConfiguration(final ConsistencyCheckJobConfiguration data) {
+ YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration();
+ result.setJobId(data.getJobId());
+ result.setParentJobId(data.getReferredJobId());
+ result.setAlgorithmTypeName(data.getAlgorithmTypeName());
+ result.setAlgorithmProperties(data.getAlgorithmProperties());
+ return result;
+ }
+
+ @Override
+ public ConsistencyCheckJobConfiguration swapToObject(final YamlConsistencyCheckJobConfiguration yamlConfig) {
+ return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProperties());
+ }
+
+ /**
+ * Swap to job configuration from text.
+ *
+ * @param jobParameter job parameter
+ * @return job configuration
+ */
+ public static ConsistencyCheckJobConfiguration swapToObject(final String jobParameter) {
+ if (null == jobParameter) {
+ return null;
+ }
+ YamlConsistencyCheckJobConfiguration yamlJobConfig = YamlEngine.unmarshal(jobParameter, YamlConsistencyCheckJobConfiguration.class, true);
+ return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+ }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
similarity index 69%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index 47227004a30..dbb318489b4 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -15,12 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.job.progress;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
/**
- * Show migration source resources statement.
+ * Data check job item progress.
*/
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+@Getter
+@Setter
+public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
+
+ private JobStatus status = JobStatus.RUNNING;
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java
similarity index 68%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java
index d2bbeaa772f..deaf027368a 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java
@@ -15,21 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
-import lombok.Getter;
+import lombok.Data;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+
+import java.util.Properties;
/**
- * Check migration statement.
+ * Create consistency check job parameter.
*/
+@Data
@RequiredArgsConstructor
-@Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+public final class CreateConsistencyCheckJobParameter {
private final String jobId;
- private final AlgorithmSegment typeStrategy;
+ private final String algorithmTypeName;
+
+ private final Properties algorithmProps;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index b0c6f88c255..bf47fc38033 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,12 +17,13 @@
package org.apache.shardingsphere.data.pipeline.core.api;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import java.util.Collection;
import java.util.List;
-import java.util.Optional;
+import java.util.Map;
/**
* Governance repository API.
@@ -56,44 +57,38 @@ public interface GovernanceRepositoryAPI {
String getJobItemProgress(String jobId, int shardingItem);
/**
- * Persist check latest result.
- *
- * @param jobId job id
- * @param checkSuccess check success
- */
- void persistCheckLatestResult(String jobId, boolean checkSuccess);
-
- /**
- * Get check latest result.
+ * Get check latest job id.
*
* @param jobId job id
- * @return check result
+ * @return check job id
*/
- Optional<Boolean> getCheckLatestResult(String jobId);
+ String getCheckLatestJobId(String jobId);
/**
- * Persist check latest detailed result.
+ * Persist check latest result.
*
* @param jobId job id
- * @param checkDetailedSuccess check detailed success
+ * @param checkJobId check job id
*/
- void persistCheckLatestDetailedResult(String jobId, String checkDetailedSuccess);
+ void persistCheckLatestJobId(String jobId, String checkJobId);
/**
- * Get check latest detailed result.
+ * Get check job result.
*
* @param jobId job id
- * @return check detailed result
+ * @param checkJobId check job id
+ * @return check job result
*/
- Optional<String> getCheckLatestDetailedResult(String jobId);
+ Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId, String checkJobId);
/**
- * Persist check job id.
+ * Persist check latest detailed result.
*
* @param jobId job id
* @param checkJobId check job id
+ * @param dataConsistencyCheckResult check result
*/
- void persistCheckJobId(String jobId, String checkJobId);
+ void persistCheckJobResult(String jobId, String checkJobId, Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult);
/**
* List check job ids.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index d0be204a7d0..31ce5d6a086 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
@@ -178,7 +177,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
String jobId = jobConfig.getJobId();
Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 7a2f7f141a8..3af215a37b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -17,19 +17,26 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
-import com.google.common.base.Strings;
-import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Optional;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
@@ -57,32 +64,44 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
}
@Override
- public void persistCheckLatestResult(final String jobId, final boolean checkSuccess) {
- log.info("persist check latest result '{}' for job {}", checkSuccess, jobId);
- repository.persist(PipelineMetaDataNode.getCheckLatestResultPath(jobId), String.valueOf(checkSuccess));
+ public String getCheckLatestJobId(final String jobId) {
+ return repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId));
}
@Override
- public Optional<Boolean> getCheckLatestResult(final String jobId) {
- String data = repository.get(PipelineMetaDataNode.getCheckLatestResultPath(jobId));
- return Strings.isNullOrEmpty(data) ? Optional.empty() : Optional.of(Boolean.parseBoolean(data));
+ public void persistCheckLatestJobId(final String jobId, final String checkJobId) {
+ log.info("persist check job id '{}' for job {}", checkJobId, jobId);
+ repository.persist(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), String.valueOf(checkJobId));
}
+ @SuppressWarnings("unchecked")
@Override
- public void persistCheckLatestDetailedResult(final String jobId, @NonNull final String checkDetailedSuccess) {
- log.info("persist check latest detailed result, jobId={}, checkDetailedSuccess={}", jobId, checkDetailedSuccess);
- repository.persist(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId), checkDetailedSuccess);
+ public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String jobId, final String checkJobId) {
+ Map<String, DataConsistencyCheckResult> result = new HashMap<>();
+ String checkJobText = repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+ if (StringUtils.isBlank(checkJobText)) {
+ return Collections.emptyMap();
+ }
+ Map<String, String> checkJobConfigMap = YamlEngine.unmarshal(checkJobText, Map.class, true);
+ for (Entry<String, String> entry : checkJobConfigMap.entrySet()) {
+ result.put(entry.getKey(), YamlDataConsistencyCheckResultSwapper.swapToObject(entry.getValue()));
+ }
+ return result;
}
@Override
- public Optional<String> getCheckLatestDetailedResult(final String jobId) {
- return Optional.ofNullable(repository.get(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId)));
- }
-
- @Override
- public void persistCheckJobId(final String jobId, final String checkJobId) {
- log.info("persist check job id, jobId={}, checkJobId={}", jobId, checkJobId);
- repository.persist(PipelineMetaDataNode.getCheckJobIdPath(jobId, checkJobId), "");
+ public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult) {
+ if (null == dataConsistencyCheckResult) {
+ log.warn("data consistency check is null, jobId {}, checkJobId {}", jobId, checkJobId);
+ return;
+ }
+ log.info("persist check job result '{}' for job {}", checkJobId, jobId);
+ Map<String, String> checkResultMap = new LinkedHashMap<>();
+ for (Entry<String, DataConsistencyCheckResult> entry : dataConsistencyCheckResult.entrySet()) {
+ YamlDataConsistencyCheckResult checkResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
+ checkResultMap.put(entry.getKey(), YamlEngine.marshal(checkResult));
+ }
+ repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId), YamlEngine.marshal(checkResultMap));
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
index a73e812ac52..1a5c7cf30dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpe
*/
public final class PipelineJobExecutionException extends PipelineSQLException {
- private static final long serialVersionUID = -5530453461378051166L;
+ private static final long serialVersionUID = -8462847591661221914L;
public PipelineJobExecutionException(final String taskId, final Throwable cause) {
super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed", taskId, cause.getMessage());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
similarity index 73%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
index a73e812ac52..dff0e6a1905 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
@@ -21,13 +21,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Pipeline job execution exception.
+ * Pipeline job has already existed exception.
*/
-public final class PipelineJobExecutionException extends PipelineSQLException {
+public final class PipelineJobHasAlreadyExistedException extends PipelineSQLException {
- private static final long serialVersionUID = -5530453461378051166L;
+ private static final long serialVersionUID = 2854259384634892428L;
- public PipelineJobExecutionException(final String taskId, final Throwable cause) {
- super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed", taskId, cause.getMessage());
+ public PipelineJobHasAlreadyExistedException(final String jobId) {
+ super(XOpenSQLState.GENERAL_ERROR, 86, "Job `%s` has already existed", jobId);
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
similarity index 72%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index 47227004a30..05dd63bb70c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -15,12 +15,16 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Show migration source resources statement.
+ * Yaml data check job progress.
*/
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+@Data
+public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
+
+ private String status;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
new file mode 100644
index 00000000000..8796af3c6d8
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML data check job progress swapper.
+ */
+public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobProgress, ConsistencyCheckJobProgress> {
+
+ @Override
+ public YamlConsistencyCheckJobProgress swapToYamlConfiguration(final ConsistencyCheckJobProgress data) {
+ YamlConsistencyCheckJobProgress result = new YamlConsistencyCheckJobProgress();
+ result.setStatus(data.getStatus().name());
+ return result;
+ }
+
+ @Override
+ public ConsistencyCheckJobProgress swapToObject(final YamlConsistencyCheckJobProgress yamlConfig) {
+ ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
+ result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
+ return result;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 3d459741913..3508ac9b71d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -30,7 +30,7 @@ import java.util.regex.Pattern;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineMetaDataNode {
- private static final String JOB_PATTERN_PREFIX = DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-f]+)";
+ private static final String JOB_PATTERN_PREFIX = DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)";
public static final Pattern CONFIG_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/config");
@@ -120,23 +120,24 @@ public final class PipelineMetaDataNode {
}
/**
- * Get check latest result path.
+ * Get check latest detailed result path.
*
* @param jobId job id
- * @return check latest result path
+ * @return check latest job id path
*/
- public static String getCheckLatestResultPath(final String jobId) {
- return String.join("/", getJobRootPath(jobId), "check", "latest_result");
+ public static String getCheckLatestJobIdPath(final String jobId) {
+ return String.join("/", getJobRootPath(jobId), "check", "latest_job_id");
}
/**
- * Get check latest detailed result path.
+ * Get check latest result path.
*
* @param jobId job id
- * @return check latest detailed result path
+ * @param checkJobId check job id
+ * @return check latest result path
*/
- public static String getCheckLatestDetailedResultPath(final String jobId) {
- return String.join("/", getJobRootPath(jobId), "check", "latest_detailed_result");
+ public static String getCheckJobResultPath(final String jobId, final String checkJobId) {
+ return String.join("/", getCheckJobIdsRootPath(jobId), checkJobId);
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
new file mode 100644
index 00000000000..ea606b8e296
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consistency check job configuration changed processor.
+ */
+@Slf4j
+public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
+
+ @Override
+ public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
+ String jobId = jobConfigPOJO.getJobName();
+ if (jobConfigPOJO.isDisabled()) {
+ log.info("{} is disabled", jobId);
+ PipelineJobCenter.stop(jobId);
+ return;
+ }
+ switch (eventType) {
+ case ADDED:
+ case UPDATED:
+ if (PipelineJobCenter.isJobExisting(jobId)) {
+ log.info("{} added to executing jobs failed since it already exists", jobId);
+ } else {
+ log.info("{} executing jobs", jobId);
+ CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+ if (null != throwable) {
+ log.error("execute failed, jobId={}", jobId, throwable);
+ }
+ });
+ }
+ break;
+ case DELETED:
+ log.info("deleted consistency check job id: {}", jobId);
+ PipelineJobCenter.stop(jobId);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+ ConsistencyCheckJob job = new ConsistencyCheckJob();
+ PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+ OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+ job.setOneOffJobBootstrap(oneOffJobBootstrap);
+ oneOffJobBootstrap.execute();
+ }
+
+ @Override
+ public String getType() {
+ return JobType.CONSISTENCY_CHECK.getTypeName();
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
new file mode 100644
index 00000000000..14cb1c04245
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+
+import java.util.Map;
+
+/**
+ * Consistency check job.
+ */
+@Slf4j
+public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
+
+ private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+
+ private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
+
+ @Override
+ public void execute(final ShardingContext shardingContext) {
+ String checkJobId = shardingContext.getJobName();
+ setJobId(checkJobId);
+ ConsistencyCheckJobConfiguration consistencyCheckJobConfig = YamlConsistencyCheckJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
+ ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, JobStatus.FINISHED);
+ jobAPI.persistJobItemProgress(jobItemContext);
+ String referredJobId = consistencyCheckJobConfig.getReferredJobId();
+ log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, referredJobId);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(referredJobId, checkJobId);
+ JobType jobType = PipelineJobIdUtils.parseJobType(referredJobId);
+ InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
+ if (StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
+ dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId);
+ } else {
+ dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+ }
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(referredJobId, checkJobId, dataConsistencyCheckResult);
+ jobItemContext.setStatus(JobStatus.FINISHED);
+ jobAPI.persistJobItemProgress(jobItemContext);
+ jobAPI.stop(checkJobId);
+ log.info("execute consistency check job finished, job id:{}, referred job id:{}", checkJobId, referredJobId);
+ }
+
+ @Override
+ public void stop() {
+ setStopping(true);
+ if (null != getOneOffJobBootstrap()) {
+ getOneOffJobBootstrap().shutdown();
+ }
+ if (null == getJobId()) {
+ log.info("stop consistency check job, jobId is null, ignore");
+ return;
+ }
+ String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
+ pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
+ }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
similarity index 64%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
index 47227004a30..dca95957b5c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
/**
- * Show migration source resources statement.
+ * Consistency check job API.
*/
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI, PipelineJobItemAPI {
+
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
similarity index 55%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
index a73e812ac52..858d79a60b8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
@@ -15,19 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
/**
- * Pipeline job execution exception.
+ * Consistency check job API factory.
*/
-public final class PipelineJobExecutionException extends PipelineSQLException {
+public final class ConsistencyCheckJobAPIFactory {
- private static final long serialVersionUID = -5530453461378051166L;
+ static {
+ ShardingSphereServiceLoader.register(ConsistencyCheckJobAPI.class);
+ }
- public PipelineJobExecutionException(final String taskId, final Throwable cause) {
- super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed", taskId, cause.getMessage());
+ /**
+ * Get instance of migration job API.
+ *
+ * @return got instance
+ */
+ public static ConsistencyCheckJobAPI getInstance() {
+ return RequiredSPIRegistry.getRegisteredService(ConsistencyCheckJobAPI.class);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
new file mode 100644
index 00000000000..1512158497c
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl implements ConsistencyCheckJobAPI {
+
+ private static final YamlConsistencyCheckJobProgressSwapper PROGRESS_SWAPPER = new YamlConsistencyCheckJobProgressSwapper();
+
+ @Override
+ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+ ConsistencyCheckJobId jobId = (ConsistencyCheckJobId) pipelineJobId;
+ return jobId.getPipelineJobId() + jobId.getSequence();
+ }
+
+ @Override
+ public String createJobAndStart(final CreateConsistencyCheckJobParameter parameter) {
+ GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+ String checkLatestJobId = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
+ if (StringUtils.isNotBlank(checkLatestJobId)) {
+ PipelineJobItemProgress progress = getJobItemProgress(checkLatestJobId, 0);
+ if (null != progress && JobStatus.FINISHED != progress.getStatus()) {
+ log.info("check job already existed and status isn't FINISHED, status {}", progress.getStatus());
+ throw new PipelineJobHasAlreadyExistedException(checkLatestJobId);
+ }
+ }
+ int consistencyCheckVersionNew = null == checkLatestJobId ? 0 : ConsistencyCheckJobId.getSequence(checkLatestJobId) + 1;
+ YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
+ ConsistencyCheckJobId checkJobId = new ConsistencyCheckJobId(parameter.getJobId(), consistencyCheckVersionNew);
+ String result = marshalJobId(checkJobId);
+ yamlConfig.setJobId(result);
+ yamlConfig.setParentJobId(parameter.getJobId());
+ yamlConfig.setAlgorithmTypeName(parameter.getAlgorithmTypeName());
+ yamlConfig.setAlgorithmProperties(parameter.getAlgorithmProps());
+ ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig);
+ start(jobConfig);
+ return result;
+ }
+
+ @Override
+ public Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(final String jobId) {
+ String checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
+ if (StringUtils.isBlank(checkLatestJobId)) {
+ return Collections.emptyMap();
+ }
+ return PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(jobId, checkLatestJobId);
+ }
+
+ @Override
+ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+ ConsistencyCheckJobProgress jobProgress = new ConsistencyCheckJobProgress();
+ jobProgress.setStatus(jobItemContext.getStatus());
+ YamlConsistencyCheckJobProgress yamlJobProgress = PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
+ }
+
+ @Override
+ public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+ String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+ if (StringUtils.isBlank(progress)) {
+ return null;
+ }
+ ConsistencyCheckJobProgress jobProgress = PROGRESS_SWAPPER.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
+ ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
+ result.setStatus(jobProgress.getStatus());
+ return result;
+ }
+
+ @Override
+ public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
+ ConsistencyCheckJobProgress jobProgress = (ConsistencyCheckJobProgress) getJobItemProgress(jobId, shardingItem);
+ if (null == jobProgress) {
+ log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
+ return;
+ }
+ jobProgress.setStatus(status);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress)));
+ }
+
+ @Override
+ public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
+ return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ }
+
+ @Override
+ protected ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
+ return YamlConsistencyCheckJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+ }
+
+ @Override
+ protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
+ return new YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration((ConsistencyCheckJobConfiguration) jobConfig);
+ }
+
+ @Override
+ public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+ }
+
+ @Override
+ public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+ return null;
+ }
+
+ @Override
+ public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
+ return null;
+ }
+
+ @Override
+ protected PipelineJobInfo getJobInfo(final String jobId) {
+ return null;
+ }
+
+ @Override
+ protected String getJobClassName() {
+ return ConsistencyCheckJob.class.getName();
+ }
+
+ @Override
+ public JobType getJobType() {
+ return JobType.CONSISTENCY_CHECK;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index c5bc8680f5c..623feb8469c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -23,9 +23,6 @@ import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
-import java.time.Instant;
-import java.time.format.DateTimeFormatter;
-
/**
* Consistency check job id.
*/
@@ -35,15 +32,30 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
public static final String CURRENT_VERSION = "01";
- private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+ private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
private final String pipelineJobId;
- private final String createTimeMinutes;
+ private final int sequence;
- public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final long createTimeMillis) {
+ public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final int sequence) {
super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
this.pipelineJobId = pipelineJobId;
- this.createTimeMinutes = DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(createTimeMillis));
+ if (sequence > MAX_CONSISTENCY_CHECK_VERSION) {
+ this.sequence = 0;
+ } else {
+ this.sequence = sequence;
+ }
+ }
+
+ /**
+ * Get consistency check version.
+ *
+ * @param consistencyCheckJobId consistency check job id.
+ * @return sequence
+ */
+ public static int getSequence(final @NonNull String consistencyCheckJobId) {
+ String versionString = consistencyCheckJobId.substring(consistencyCheckJobId.length() - 1);
+ return Integer.parseInt(versionString);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
new file mode 100644
index 00000000000..b3471ac39b4
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+
+/**
+ * Consistency check job item context.
+ */
+@Getter
+@Setter
+@Slf4j
+public final class ConsistencyCheckJobItemContext implements PipelineJobItemContext {
+
+ private final String jobId;
+
+ private final int shardingItem;
+
+ private String dataSourceName;
+
+ private volatile boolean stopping;
+
+ private volatile JobStatus status;
+
+ private final ConsistencyCheckJobConfiguration jobConfig;
+
+ public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final JobStatus status) {
+ this.jobConfig = jobConfig;
+ jobId = jobConfig.getJobId();
+ this.shardingItem = shardingItem;
+ this.status = status;
+ }
+
+ @Override
+ public PipelineProcessContext getJobProcessContext() {
+ throw new UnsupportedOperationException("");
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
similarity index 89%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
index 90857b277c4..4c63097dd84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
index 90857b277c4..141ea167a06 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
@@ -16,3 +16,4 @@
#
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
index 89ef92e051e..2f4035228af 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
@@ -16,3 +16,4 @@
#
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationChangedJobConfigurationProcessor
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckChangedJobConfigurationProcessor
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
similarity index 89%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
index 90857b277c4..4c63097dd84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
index 28683d1da23..a0681f81d4a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -71,13 +71,13 @@ public final class PipelineMetaDataNodeTest {
}
@Test
- public void assertGetCheckLatestResultPath() {
- assertThat(PipelineMetaDataNode.getCheckLatestResultPath(jobId), is(jobCheckRootPath + "/latest_result"));
+ public void assertGetCheckLatestJobIdPath() {
+ assertThat(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), is(jobCheckRootPath + "/latest_job_id"));
}
@Test
- public void assertGetCheckLatestDetailedResultPath() {
- assertThat(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId), is(jobCheckRootPath + "/latest_detailed_result"));
+ public void assertGetCheckJobResultPath() {
+ assertThat(PipelineMetaDataNode.getCheckJobResultPath(jobId, "j02fx123"), is(jobCheckRootPath + "/job_ids/j02fx123"));
}
@Test
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 445fe1a5492..b46a70d537c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.integration.data.pipeline.cases.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
import org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
@@ -28,9 +29,11 @@ import org.opengauss.util.PSQLException;
import javax.xml.bind.JAXB;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
@@ -159,10 +162,20 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find " + tableName + " table")).get("id").toString();
}
- protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) {
- List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType));
+ protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) throws SQLException {
+ proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
+ List<Map<String, Object>> checkJobResults = Collections.emptyList();
+ for (int i = 0; i < 10; i++) {
+ checkJobResults = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
+ if (null != checkJobResults && !checkJobResults.isEmpty()) {
+ break;
+ }
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
+ }
+ assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
log.info("check job results: {}", checkJobResults);
for (Map<String, Object> entry : checkJobResults) {
+ assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
}
}
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java
index 9e1bdfa93e5..0f62fe8f217 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java
@@ -20,20 +20,20 @@ package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statemen
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
-import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceResourcesStatement;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.CheckMigrationStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationCheckStatusStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationCheckAlgorithmsStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationListStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationSourceResourceStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationStatusStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.SQLParserTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ShowMigrationListStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckStatusStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckAlgorithmsStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationSourceResourcesStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationStatusStatementTestCase;
@@ -57,8 +57,8 @@ public final class QueryableScalingRALStatementAssert {
ShowMigrationListStatementAssert.assertIs(assertContext, (ShowMigrationListStatement) actual, (ShowMigrationListStatementTestCase) expected);
} else if (actual instanceof ShowMigrationCheckAlgorithmsStatement) {
ShowMigrationCheckAlgorithmsStatementAssert.assertIs(assertContext, (ShowMigrationCheckAlgorithmsStatement) actual, (ShowMigrationCheckAlgorithmsStatementTestCase) expected);
- } else if (actual instanceof CheckMigrationStatement) {
- CheckMigrationStatementAssert.assertIs(assertContext, (CheckMigrationStatement) actual, (CheckMigrationStatementTestCase) expected);
+ } else if (actual instanceof ShowMigrationCheckStatusStatement) {
+ ShowMigrationCheckStatusStatementAssert.assertIs(assertContext, (ShowMigrationCheckStatusStatement) actual, (ShowMigrationCheckStatusStatementTestCase) expected);
} else if (actual instanceof ShowMigrationStatusStatement) {
ShowMigrationStatusStatementAssert.assertIs(assertContext, (ShowMigrationStatusStatement) actual, (ShowMigrationStatusStatementTestCase) expected);
} else if (actual instanceof ShowMigrationSourceResourcesStatement) {
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java
index 808eac96ac6..e1dd898baad 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java
@@ -22,28 +22,31 @@ import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.DropPipelineProcessConfigurationStatement;
import org.apache.shardingsphere.migration.distsql.statement.AddMigrationSourceResourceStatement;
-import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.DropMigrationSourceResourceStatement;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
-import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.CheckMigrationStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.AddMigrationSourceResourceStatementAssert;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.RollbackMigrationStatementAssert;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.DropPipelineProcessConfigurationStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.CommitMigrationStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.DropMigrationSourceResourceStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.DropPipelineProcessConfigurationStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.MigrateTableStatementAssert;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.CommitMigrationStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.RollbackMigrationStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.StartMigrationStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.StopMigrationStatementAssert;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.SQLParserTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.AddMigrationSourceResourceStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropMigrationSourceResourceStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.MigrateTableStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.StartMigrationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.StopMigrationStatementTestCase;
@@ -79,6 +82,8 @@ public final class UpdatableScalingRALStatementAssert {
AddMigrationSourceResourceStatementAssert.assertIs(assertContext, (AddMigrationSourceResourceStatement) actual, (AddMigrationSourceResourceStatementTestCase) expected);
} else if (actual instanceof DropMigrationSourceResourceStatement) {
DropMigrationSourceResourceStatementAssert.assertIs(assertContext, (DropMigrationSourceResourceStatement) actual, (DropMigrationSourceResourceStatementTestCase) expected);
+ } else if (actual instanceof CheckMigrationStatement) {
+ CheckMigrationStatementAssert.assertIs(assertContext, (CheckMigrationStatement) actual, (CheckMigrationStatementTestCase) expected);
}
}
}
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/ShowMigrationCheckStatusStatementAssert.java
similarity index 64%
copy from shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
copy to shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/ShowMigrationCheckStatusStatementAssert.java
index d485086a5ca..f6ca28bf698 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/ShowMigrationCheckStatusStatementAssert.java
@@ -17,38 +17,35 @@
package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.segment.impl.distsql.ExpectedAlgorithm;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckStatusStatementTestCase;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.hamcrest.MatcherAssert.assertThat;
/**
- * Check migration statement assert.
+ * Show migration check status statement assert.
*/
-public final class CheckMigrationStatementAssert {
+public final class ShowMigrationCheckStatusStatementAssert {
/**
- * Assert check migration statement is correct with expected parser result.
+ * Assert migration check status statement is correct with expected parser result.
*
* @param assertContext assert context
* @param actual actual check migration statement
* @param expected expected check migration statement test case
*/
- public static void assertIs(final SQLCaseAssertContext assertContext, final CheckMigrationStatement actual, final CheckMigrationStatementTestCase expected) {
+ public static void assertIs(final SQLCaseAssertContext assertContext, final ShowMigrationCheckStatusStatement actual, final ShowMigrationCheckStatusStatementTestCase expected) {
if (null == expected) {
assertNull(assertContext.getText("Actual statement should not exist."), actual);
} else {
assertNotNull(assertContext.getText("Actual statement should exist."), actual);
assertJobIds(assertContext, actual.getJobId(), expected.getJobIds());
- assertTypeStrategy(assertContext, actual.getTypeStrategy(), expected.getTableStrategies());
}
}
@@ -60,13 +57,4 @@ public final class CheckMigrationStatementAssert {
assertThat(assertContext.getText("Job id assertion error"), actual, is(expected.iterator().next()));
}
}
-
- private static void assertTypeStrategy(final SQLCaseAssertContext assertContext, final AlgorithmSegment actual, final List<ExpectedAlgorithm> expected) {
- if (expected.isEmpty()) {
- assertNull(assertContext.getText("Actual type strategy should not exist."), actual);
- } else {
- assertNotNull(assertContext.getText("Actual type strategy should exist."), actual);
- assertThat(assertContext.getText("Type strategy assertion error"), actual.getName(), is(expected.iterator().next().getName()));
- }
- }
}
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/update/CheckMigrationStatementAssert.java
similarity index 98%
rename from shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
rename to shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/update/CheckMigrationStatementAssert.java
index d485086a5ca..558e88b04f6 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/update/CheckMigrationStatementAssert.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query;
+package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
index cbf4cb1fec5..d8eba88a095 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
@@ -286,11 +286,11 @@ import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ClearHintStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ClearReadwriteSplittingHintStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ClearShardingHintStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ConvertYamlConfigurationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.CreateTrafficRuleStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.DiscardDistSQLStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.DropTrafficRuleStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ExportDatabaseConfigurationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ConvertYamlConfigurationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ImportDatabaseConfigurationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.LabelInstanceStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.PrepareDistSQLStatementTestCase;
@@ -316,11 +316,12 @@ import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.UnlabelInstanceStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.AddMigrationSourceResourceStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropMigrationSourceResourceStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.MigrateTableStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckStatusStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckAlgorithmsStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationSourceResourcesStatementTestCase;
import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationStatusStatementTestCase;
@@ -1015,6 +1016,9 @@ public final class SQLParserTestCases {
@XmlElement(name = "show-migration-status")
private final List<ShowMigrationStatusStatementTestCase> showMigrationStatusTestCases = new LinkedList<>();
+ @XmlElement(name = "show-migration-check-status")
+ private final List<ShowMigrationCheckStatusStatementTestCase> showMigrationCheckStatusTestCases = new LinkedList<>();
+
@XmlElement(name = "show-migration-check-algorithms")
private final List<ShowMigrationCheckAlgorithmsStatementTestCase> showMigrationCheckAlgorithmsStatementTestCases = new LinkedList<>();
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/statement/distsql/ral/migration/ShowMigrationCheckStatusStatementTestCase.java
similarity index 60%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/statement/distsql/ral/migration/ShowMigrationCheckStatusStatementTestCase.java
index d2bbeaa772f..2db4fa294b7 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/statement/distsql/ral/migration/ShowMigrationCheckStatusStatementTestCase.java
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.SQLParserTestCase;
+
+import javax.xml.bind.annotation.XmlElement;
+import java.util.LinkedList;
+import java.util.List;
/**
- * Check migration statement.
+ * Check migration statement test case.
*/
-@RequiredArgsConstructor
@Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
-
- private final String jobId;
+public final class ShowMigrationCheckStatusStatementTestCase extends SQLParserTestCase {
- private final AlgorithmSegment typeStrategy;
+ @XmlElement(name = "job-id")
+ private final List<String> jobIds = new LinkedList<>();
}
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml
index 77aae07035d..36d8995dd40 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml
@@ -38,6 +38,10 @@
<job-id>123</job-id>
</show-migration-status>
+ <show-migration-check-status sql-case-id="show-migration-check-status">
+ <job-id>123</job-id>
+ </show-migration-check-status>
+
<drop-migration-process-configuration sql-case-id="drop-migration-process-configuration-read">
<conf-path>/READ</conf-path>
</drop-migration-process-configuration>
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml
index 5586573b7ac..39aa309a54c 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml
@@ -22,6 +22,7 @@
<distsql-case id="show-migration-check-algorithms" value="SHOW MIGRATION CHECK ALGORITHMS;" />
<distsql-case id="check-migration" value="CHECK MIGRATION 123;" />
<distsql-case id="show-migration-status" value="SHOW MIGRATION STATUS 123;" />
+ <distsql-case id="show-migration-check-status" value="SHOW MIGRATION CHECK STATUS 123;" />
<distsql-case id="check-migration-with-type" value="CHECK MIGRATION 123 by TYPE(name='DEFAULT', PROPERTIES('test-property'='4'));" />
<distsql-case id="drop-migration-process-configuration-read" value="DROP MIGRATION PROCESS CONFIGURATION '/READ';" />
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
index 649fae2ed52..136871be3c2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
import org.junit.Test;
@@ -31,4 +32,9 @@ public final class PipelineAPIFactoryTest {
public void assertGetPipelineJobAPI() {
assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), instanceOf(MigrationJobAPIImpl.class));
}
+
+ @Test
+ public void assertGetConsistencyCheckJobAPI() {
+ assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckJobAPIImpl.class));
+ }
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index 912a9ebf2c2..21d62ecb150 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
import org.junit.Test;
@@ -43,4 +44,9 @@ public final class PipelineJobPublicAPIFactoryTest {
public void assertGetMigrationJobPublicAPI() {
assertThat(PipelineJobPublicAPIFactory.getMigrationJobPublicAPI(), instanceOf(MigrationJobAPIImpl.class));
}
+
+ @Test
+ public void assertGetConsistencyCheckJobPublicAPI() {
+ assertThat(PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI(), instanceOf(ConsistencyCheckJobAPIImpl.class));
+ }
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index af4e96eb5a1..d435b88413f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -17,6 +17,9 @@
package org.apache.shardingsphere.data.pipeline.api.impl;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -40,18 +43,19 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.Types;
+import java.util.HashMap;
import java.util.List;
-import java.util.Optional;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -76,9 +80,12 @@ public final class GovernanceRepositoryAPIImplTest {
@Test
public void assertPersistJobCheckResult() {
MigrationJobItemContext jobItemContext = mockJobItemContext();
- governanceRepositoryAPI.persistCheckLatestResult(jobItemContext.getJobId(), true);
- Optional<Boolean> checkResult = governanceRepositoryAPI.getCheckLatestResult(jobItemContext.getJobId());
- assertTrue(checkResult.isPresent() && checkResult.get());
+ Map<String, DataConsistencyCheckResult> actual = new HashMap<>();
+ actual.put("test", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1), new DataConsistencyContentCheckResult(true)));
+ governanceRepositoryAPI.persistCheckJobResult(jobItemContext.getJobId(), "j02123", actual);
+ Map<String, DataConsistencyCheckResult> checkResult = governanceRepositoryAPI.getCheckJobResult(jobItemContext.getJobId(), "j02123");
+ assertThat(checkResult.size(), is(1));
+ assertTrue(checkResult.get("test").getContentCheckResult().isMatched());
}
@Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
new file mode 100644
index 00000000000..65907b3c29a
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class ConsistencyCheckJobAPIImplTest {
+
+ private static ConsistencyCheckJobAPI jobAPI;
+
+ @BeforeClass
+ public static void beforeClass() {
+ PipelineContextUtil.mockModeConfigAndContextManager();
+ jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+ }
+
+ @Test
+ public void assertCreateJobConfig() {
+ String migrationJobId = "j0101test";
+ CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(migrationJobId, null, null);
+ String checkJobId = jobAPI.createJobAndStart(parameter);
+ ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) jobAPI.getJobConfiguration(checkJobId);
+ String expectCheckJobId = "j0201j0101test0";
+ assertThat(jobConfig.getJobId(), is(expectCheckJobId));
+ assertNull(jobConfig.getAlgorithmTypeName());
+ int consistencyCheckVersion = ConsistencyCheckJobId.getSequence(expectCheckJobId);
+ assertThat(consistencyCheckVersion, is(0));
+ }
+}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 9770d830787..0f240d13966 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -31,8 +31,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -63,10 +61,10 @@ import java.util.Objects;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
@Slf4j
@@ -222,10 +220,8 @@ public final class MigrationJobAPIImplTest {
final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
MigrationJobItemContext jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
jobAPI.persistJobItemProgress(jobItemContext);
- repositoryAPI.persistCheckLatestResult(jobId.get(), true);
jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
Map<Integer, InventoryIncrementalJobItemProgress> progress = jobAPI.getJobProgress(jobId.get());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : progress.entrySet()) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
similarity index 66%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
copy to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
index 649fae2ed52..a8cf8eeda01 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
-public final class PipelineAPIFactoryTest {
+public final class ConsistencyCheckJobAPIFactoryTest {
@Test
- public void assertGetPipelineJobAPI() {
- assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), instanceOf(MigrationJobAPIImpl.class));
+ public void assertGetInstance() {
+ assertThat(ConsistencyCheckJobAPIFactory.getInstance(), instanceOf(ConsistencyCheckJobAPIImpl.class));
}
}