You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/27 05:21:39 UTC

[shardingsphere] branch master updated: Refactor check migration, change to use async job (#21194)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 098c08d153d Refactor check migration, change to use async job (#21194)
098c08d153d is described below

commit 098c08d153db09fdd080b212a9425839fc0eee46
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Sep 27 13:21:29 2022 +0800

    Refactor check migration, change to use async job (#21194)
    
    * Refactor check migration, change to use async job
    
    * Remove unused prop
    
    * Fix merge compiler error
    
    * Fix codestyle
    
    * Change param description
    
    * Update class name
    
    * Improve
---
 ...=> ShowMigrationCheckStatusQueryResultSet.java} |  44 +++---
 .../handler/update/CheckMigrationJobUpdater.java   |  49 ++++++
 ...dingsphere.infra.distsql.query.DistSQLResultSet |   2 +-
 ....shardingsphere.infra.distsql.update.RALUpdater |   1 +
 .../main/antlr4/imports/migration/RALStatement.g4  |   4 +
 .../parser/autogen/MigrationDistSQLStatement.g4    |   1 +
 .../core/MigrationDistSQLStatementVisitor.java     |   7 +
 .../distsql/statement/CheckMigrationStatement.java |   4 +-
 ...java => ShowMigrationCheckStatusStatement.java} |   7 +-
 .../ShowMigrationSourceResourcesStatement.java     |   2 +-
 .../pipeline/api/ConsistencyCheckJobPublicAPI.java |  48 ++++++
 .../pipeline/api/PipelineJobPublicAPIFactory.java  |  10 ++
 .../yaml/YamlDataConsistencyCheckResult.java       |  62 ++++++++
 .../YamlDataConsistencyCheckResultSwapper.java     |  69 +++++++++
 .../job/ConsistencyCheckJobConfiguration.java      |  35 ++++-
 .../yaml/YamlConsistencyCheckJobConfiguration.java |  31 ++--
 ...amlConsistencyCheckJobConfigurationSwapper.java |  59 +++++++
 .../job/progress/ConsistencyCheckJobProgress.java  |  14 +-
 .../pojo/CreateConsistencyCheckJobParameter.java   |  18 ++-
 .../pipeline/core/api/GovernanceRepositoryAPI.java |  35 ++---
 .../AbstractInventoryIncrementalJobAPIImpl.java    |   2 -
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |  59 ++++---
 .../job/PipelineJobExecutionException.java         |   2 +-
 ... => PipelineJobHasAlreadyExistedException.java} |  10 +-
 .../yaml/YamlConsistencyCheckJobProgress.java      |  12 +-
 .../YamlConsistencyCheckJobProgressSwapper.java    |  42 +++++
 .../core/metadata/node/PipelineMetaDataNode.java   |  19 +--
 ...tencyCheckChangedJobConfigurationProcessor.java |  81 ++++++++++
 .../consistencycheck/ConsistencyCheckJob.java      |  88 +++++++++++
 .../consistencycheck/ConsistencyCheckJobAPI.java   |  11 +-
 .../ConsistencyCheckJobAPIFactory.java}            |  23 ++-
 .../ConsistencyCheckJobAPIImpl.java                | 172 +++++++++++++++++++++
 .../consistencycheck/ConsistencyCheckJobId.java    |  26 +++-
 .../ConsistencyCheckJobItemContext.java            |  59 +++++++
 ...data.pipeline.api.ConsistencyCheckJobPublicAPI} |   2 +-
 ...ingsphere.data.pipeline.core.api.PipelineJobAPI |   1 +
 ...andler.PipelineChangedJobConfigurationProcessor |   1 +
 ...enario.consistencycheck.ConsistencyCheckJobAPI} |   2 +-
 .../metadata/node/PipelineMetaDataNodeTest.java    |   8 +-
 .../cases/migration/AbstractMigrationITCase.java   |  17 +-
 .../QueryableScalingRALStatementAssert.java        |  10 +-
 .../UpdatableScalingRALStatementAssert.java        |  21 ++-
 ...> ShowMigrationCheckStatusStatementAssert.java} |  26 +---
 .../CheckMigrationStatementAssert.java             |   2 +-
 .../jaxb/cases/domain/SQLParserTestCases.java      |  12 +-
 .../ShowMigrationCheckStatusStatementTestCase.java |  20 +--
 .../src/main/resources/case/ral/migration.xml      |   4 +
 .../main/resources/sql/supported/ral/migration.xml |   1 +
 .../data/pipeline/api/PipelineAPIFactoryTest.java  |   6 +
 .../api/PipelineJobPublicAPIFactoryTest.java       |   6 +
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  17 +-
 .../api/impl/ConsistencyCheckJobAPIImplTest.java   |  55 +++++++
 .../core/api/impl/MigrationJobAPIImplTest.java     |   6 +-
 .../ConsistencyCheckJobAPIFactoryTest.java}        |  11 +-
 54 files changed, 1124 insertions(+), 212 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
similarity index 53%
rename from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
rename to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index e448711660d..5c90cfbb7b1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -17,48 +17,44 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
-import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Map.Entry;
 
 /**
- * Query result set for check migration.
+ * Show migration check status query result set.
  */
-public final class CheckMigrationQueryResultSet implements DatabaseDistSQLResultSet {
+public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDistSQLResultSet {
     
-    private static final MigrationJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
+    private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
     
     private Iterator<Collection<Object>> data;
     
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
-        CheckMigrationStatement checkMigrationStatement = (CheckMigrationStatement) sqlStatement;
-        AlgorithmSegment typeStrategy = checkMigrationStatement.getTypeStrategy();
-        Map<String, DataConsistencyCheckResult> checkResultMap = null == typeStrategy
-                ? JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId())
-                : JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId(), typeStrategy.getName(), typeStrategy.getProps());
-        data = checkResultMap.entrySet().stream()
-                .map(each -> {
-                    Collection<Object> result = new LinkedList<>();
-                    result.add(each.getKey());
-                    result.add(each.getValue().getCountCheckResult().getSourceRecordsCount());
-                    result.add(each.getValue().getCountCheckResult().getTargetRecordsCount());
-                    result.add(each.getValue().getCountCheckResult().isMatched() + "");
-                    result.add(each.getValue().getContentCheckResult().isMatched() + "");
-                    return result;
-                }).collect(Collectors.toList()).iterator();
+        ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement;
+        Map<String, DataConsistencyCheckResult> consistencyCheckResult = JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
+        List<Collection<Object>> result = new ArrayList<>(consistencyCheckResult.size());
+        for (Entry<String, DataConsistencyCheckResult> entry : consistencyCheckResult.entrySet()) {
+            DataConsistencyCheckResult value = entry.getValue();
+            DataConsistencyCountCheckResult countCheckResult = value.getCountCheckResult();
+            result.add(Arrays.asList(entry.getKey(), countCheckResult.getSourceRecordsCount(), countCheckResult.getTargetRecordsCount(), String.valueOf(countCheckResult.isMatched()),
+                    String.valueOf(value.getContentCheckResult().isMatched())));
+        }
+        data = result.iterator();
     }
     
     @Override
@@ -78,6 +74,6 @@ public final class CheckMigrationQueryResultSet implements DatabaseDistSQLResult
     
     @Override
     public String getType() {
-        return CheckMigrationStatement.class.getName();
+        return ShowMigrationCheckStatusStatement.class.getName();
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
new file mode 100644
index 00000000000..275b158f8a9
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.migration.distsql.handler.update;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
+import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Check migration job updater.
+ */
+public final class CheckMigrationJobUpdater implements RALUpdater<CheckMigrationStatement> {
+    
+    private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
+    
+    @Override
+    public void executeUpdate(final String databaseName, final CheckMigrationStatement sqlStatement) throws SQLException {
+        AlgorithmSegment typeStrategy = sqlStatement.getTypeStrategy();
+        String algorithmTypeName = null != typeStrategy ? typeStrategy.getName() : null;
+        Properties algorithmProps = null != typeStrategy ? typeStrategy.getProps() : null;
+        JOB_API.createJobAndStart(new CreateConsistencyCheckJobParameter(sqlStatement.getJobId(), algorithmTypeName, algorithmProps));
+    }
+    
+    @Override
+    public String getType() {
+        return CheckMigrationStatement.class.getName();
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet
index 51f4bb8bc3a..9f5e9244628 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.query.DistSQLResultSet
@@ -23,7 +23,7 @@ org.apache.shardingsphere.sharding.distsql.handler.query.ShardingAuditorsQueryRe
 org.apache.shardingsphere.sharding.distsql.handler.query.ShardingTableNodesQueryResultSet
 org.apache.shardingsphere.sharding.distsql.handler.query.ShardingKeyGeneratorsQueryResultSet
 org.apache.shardingsphere.sharding.distsql.handler.query.DefaultShardingStrategyQueryResultSet
-org.apache.shardingsphere.migration.distsql.handler.query.CheckMigrationQueryResultSet
+org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckStatusQueryResultSet
 org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationListQueryResultSet
 org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationJobStatusQueryResultSet
 org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsQueryResultSet
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
index c7d37afa4af..0c026e4a415 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
@@ -22,3 +22,4 @@ org.apache.shardingsphere.migration.distsql.handler.update.CommitMigrationUpdate
 org.apache.shardingsphere.migration.distsql.handler.update.RollbackMigrationUpdater
 org.apache.shardingsphere.migration.distsql.handler.update.AddMigrationSourceResourceUpdater
 org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationSourceResourceUpdater
+org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpdater
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
index 787564c0679..e595c00fcc4 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
@@ -51,6 +51,10 @@ checkMigration
     : CHECK MIGRATION jobId (BY algorithmDefinition)?
     ;
 
+showMigrationCheckStatus
+    : SHOW MIGRATION CHECK STATUS jobId
+    ;
+
 showMigrationCheckAlgorithms
     : SHOW MIGRATION CHECK ALGORITHMS
     ;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
index 2fb0c257080..40237cd1699 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
@@ -32,5 +32,6 @@ execute
     | addMigrationSourceResource
     | dropMigrationSourceResource
     | showMigrationSourceResources
+    | showMigrationCheckStatus
     ) SEMI?
     ;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index ba5be0ce157..b7c41e605e1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatemen
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ResourceDefinitionContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RollbackMigrationContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckAlgorithmsContext;
+import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckStatusContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceResourcesContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
@@ -48,6 +49,7 @@ import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStat
 import org.apache.shardingsphere.migration.distsql.statement.DropMigrationSourceResourceStatement;
 import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
 import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceResourcesStatement;
@@ -190,4 +192,9 @@ public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStat
     public ASTNode visitShowMigrationSourceResources(final ShowMigrationSourceResourcesContext ctx) {
         return new ShowMigrationSourceResourcesStatement();
     }
+    
+    @Override
+    public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
+        return new ShowMigrationCheckStatusStatement(getIdentifierValue(ctx.jobId()));
+    }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
index d2bbeaa772f..b8aa727568d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
@@ -20,14 +20,14 @@ package org.apache.shardingsphere.migration.distsql.statement;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
 
 /**
  * Check migration statement.
  */
 @RequiredArgsConstructor
 @Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+public final class CheckMigrationStatement extends UpdatableScalingRALStatement {
     
     private final String jobId;
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
similarity index 82%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
index d2bbeaa772f..fe8a36c0e9d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
@@ -19,17 +19,14 @@ package org.apache.shardingsphere.migration.distsql.statement;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
 
 /**
- * Check migration statement.
+ * Show check migration status statement.
  */
 @RequiredArgsConstructor
 @Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+public final class ShowMigrationCheckStatusStatement extends QueryableScalingRALStatement {
     
     private final String jobId;
-    
-    private final AlgorithmSegment typeStrategy;
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
index 47227004a30..4eb277b6d95 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
@@ -22,5 +22,5 @@ import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableS
 /**
  * Show migration source resources statement.
  */
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+public final class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
new file mode 100644
index 00000000000..cb6ed9ac55d
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api;
+
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+
+import java.util.Map;
+
+/**
+ * Consistency check job public API.
+ */
+@SingletonSPI
+public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI {
+    
+    /**
+     * Create job migration config and start.
+     *
+     * @param parameter create consistency check job parameter
+     * @return job id
+     */
+    String createJobAndStart(CreateConsistencyCheckJobParameter parameter);
+    
+    /**
+     * Get latest data consistency check result.
+     *
+     * @param jobId job id
+     * @return latest data consistency check result
+     */
+    Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(String jobId);
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index a6ab527ac32..725630e86cc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -30,6 +30,7 @@ public final class PipelineJobPublicAPIFactory {
     static {
         ShardingSphereServiceLoader.register(InventoryIncrementalJobPublicAPI.class);
         ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
+        ShardingSphereServiceLoader.register(ConsistencyCheckJobPublicAPI.class);
     }
     
     /**
@@ -50,4 +51,13 @@ public final class PipelineJobPublicAPIFactory {
     public static MigrationJobPublicAPI getMigrationJobPublicAPI() {
         return RequiredSPIRegistry.getRegisteredService(MigrationJobPublicAPI.class);
     }
+    
+    /**
+     * Get instance of consistency check job public API.
+     *
+     * @return got instance
+     */
+    public static ConsistencyCheckJobPublicAPI getConsistencyCheckJobPublicAPI() {
+        return RequiredSPIRegistry.getRegisteredService(ConsistencyCheckJobPublicAPI.class);
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResult.java
new file mode 100644
index 00000000000..661f2d58102
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+/**
+ * Yaml data consistency check result config.
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public final class YamlDataConsistencyCheckResult implements YamlConfiguration {
+    
+    private YamlDataConsistencyCountCheckResult countCheckResult;
+    
+    private YamlDataConsistencyContentCheckResult contentCheckResult;
+    
+    /**
+     * Yaml data consistency count result.
+     */
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class YamlDataConsistencyCountCheckResult {
+        
+        private long sourceRecordsCount;
+        
+        private long targetRecordsCount;
+        
+        private boolean matched;
+    }
+    
+    /**
+     * Yaml data consistency content result.
+     */
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class YamlDataConsistencyContentCheckResult {
+        
+        private boolean matched;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultSwapper.java
new file mode 100644
index 00000000000..aadf937336c
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/yaml/YamlDataConsistencyCheckResultSwapper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResult.YamlDataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResult.YamlDataConsistencyCountCheckResult;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * Yaml data consistency check result swapper.
+ */
+public final class YamlDataConsistencyCheckResultSwapper implements YamlConfigurationSwapper<YamlDataConsistencyCheckResult, DataConsistencyCheckResult> {
+    
+    private static final YamlDataConsistencyCheckResultSwapper CONFIG_SWAPPER = new YamlDataConsistencyCheckResultSwapper();
+    
+    @Override
+    public YamlDataConsistencyCheckResult swapToYamlConfiguration(final DataConsistencyCheckResult data) {
+        YamlDataConsistencyCountCheckResult countCheckResult = new YamlDataConsistencyCountCheckResult();
+        countCheckResult.setSourceRecordsCount(data.getCountCheckResult().getSourceRecordsCount());
+        countCheckResult.setTargetRecordsCount(data.getCountCheckResult().getTargetRecordsCount());
+        countCheckResult.setMatched(data.getContentCheckResult().isMatched());
+        YamlDataConsistencyCheckResult result = new YamlDataConsistencyCheckResult();
+        result.setCountCheckResult(countCheckResult);
+        YamlDataConsistencyContentCheckResult contentCheckResult = new YamlDataConsistencyContentCheckResult(data.getContentCheckResult().isMatched());
+        result.setContentCheckResult(contentCheckResult);
+        return result;
+    }
+    
+    @Override
+    public DataConsistencyCheckResult swapToObject(final YamlDataConsistencyCheckResult yamlConfig) {
+        if (null == yamlConfig) {
+            return null;
+        }
+        YamlDataConsistencyCountCheckResult yamlCountCheck = yamlConfig.getCountCheckResult();
+        DataConsistencyCountCheckResult countCheckResult = new DataConsistencyCountCheckResult(yamlCountCheck.getSourceRecordsCount(), yamlCountCheck.getTargetRecordsCount());
+        DataConsistencyContentCheckResult contentCheckResult = new DataConsistencyContentCheckResult(yamlConfig.getContentCheckResult().isMatched());
+        return new DataConsistencyCheckResult(countCheckResult, contentCheckResult);
+    }
+    
+    /**
+     * Swap string to data consistency check result.
+     *
+     * @param parameter parameter
+     * @return data consistency check result
+     */
+    public static DataConsistencyCheckResult swapToObject(final String parameter) {
+        YamlDataConsistencyCheckResult yamlDataCheckResult = YamlEngine.unmarshal(parameter, YamlDataConsistencyCheckResult.class, true);
+        return CONFIG_SWAPPER.swapToObject(yamlDataCheckResult);
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
similarity index 56%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
index d2bbeaa772f..3102c2bbb9d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
@@ -15,21 +15,44 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.config.job;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Properties;
 
 /**
- * Check migration statement.
+ * Consistency check job configuration.
  */
 @RequiredArgsConstructor
 @Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+@Slf4j
+@ToString
+public final class ConsistencyCheckJobConfiguration implements PipelineJobConfiguration {
     
     private final String jobId;
     
-    private final AlgorithmSegment typeStrategy;
+    private final String referredJobId;
+    
+    private final String algorithmTypeName;
+    
+    private final Properties algorithmProperties;
+    
+    @Override
+    public String getSourceDatabaseType() {
+        throw new UnsupportedOperationException("");
+    }
+    
+    /**
+     * Get job sharding count.
+     *
+     * @return job sharding count
+     */
+    @Override
+    public int getJobShardingCount() {
+        return 1;
+    }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
similarity index 57%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
index d2bbeaa772f..53ec03e9c42 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
@@ -15,21 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Properties;
 
 /**
- * Check migration statement.
+ * Consistency check job configuration for YAML.
  */
-@RequiredArgsConstructor
 @Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+@Setter
+@Slf4j
+@ToString
+public final class YamlConsistencyCheckJobConfiguration implements YamlPipelineJobConfiguration {
+    
+    private String jobId;
+    
+    private String parentJobId;
+    
+    private String algorithmTypeName;
     
-    private final String jobId;
+    private Properties algorithmProperties;
     
-    private final AlgorithmSegment typeStrategy;
+    @Override
+    public String getTargetDatabaseName() {
+        throw new UnsupportedOperationException("");
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
new file mode 100644
index 00000000000..e7e07cdb678
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML consistency check job configuration swapper.
+ */
+public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobConfiguration, ConsistencyCheckJobConfiguration> {
+    
+    private static final YamlConsistencyCheckJobConfigurationSwapper JOB_CONFIG_SWAPPER = new YamlConsistencyCheckJobConfigurationSwapper();
+    
+    @Override
+    public YamlConsistencyCheckJobConfiguration swapToYamlConfiguration(final ConsistencyCheckJobConfiguration data) {
+        YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration();
+        result.setJobId(data.getJobId());
+        result.setParentJobId(data.getReferredJobId());
+        result.setAlgorithmTypeName(data.getAlgorithmTypeName());
+        result.setAlgorithmProperties(data.getAlgorithmProperties());
+        return result;
+    }
+    
+    @Override
+    public ConsistencyCheckJobConfiguration swapToObject(final YamlConsistencyCheckJobConfiguration yamlConfig) {
+        return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProperties());
+    }
+    
+    /**
+     * Swap to job configuration from text.
+     *
+     * @param jobParameter job parameter
+     * @return job configuration
+     */
+    public static ConsistencyCheckJobConfiguration swapToObject(final String jobParameter) {
+        if (null == jobParameter) {
+            return null;
+        }
+        YamlConsistencyCheckJobConfiguration yamlJobConfig = YamlEngine.unmarshal(jobParameter, YamlConsistencyCheckJobConfiguration.class, true);
+        return JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
similarity index 69%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index 47227004a30..dbb318489b4 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -15,12 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 
 /**
- * Show migration source resources statement.
+ * Data check job item progress.
  */
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+@Getter
+@Setter
+public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
+    
+    private JobStatus status = JobStatus.RUNNING;
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java
similarity index 68%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java
index d2bbeaa772f..deaf027368a 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateConsistencyCheckJobParameter.java
@@ -15,21 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
 
-import lombok.Getter;
+import lombok.Data;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+
+import java.util.Properties;
 
 /**
- * Check migration statement.
+ * Create consistency check job parameter.
  */
+@Data
 @RequiredArgsConstructor
-@Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
+public final class CreateConsistencyCheckJobParameter {
     
     private final String jobId;
     
-    private final AlgorithmSegment typeStrategy;
+    private final String algorithmTypeName;
+    
+    private final Properties algorithmProps;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index b0c6f88c255..bf47fc38033 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,12 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api;
 
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
+import java.util.Map;
 
 /**
  * Governance repository API.
@@ -56,44 +57,38 @@ public interface GovernanceRepositoryAPI {
     String getJobItemProgress(String jobId, int shardingItem);
     
     /**
-     * Persist check latest result.
-     *
-     * @param jobId job id
-     * @param checkSuccess check success
-     */
-    void persistCheckLatestResult(String jobId, boolean checkSuccess);
-    
-    /**
-     * Get check latest result.
+     * Get check latest job id.
      *
      * @param jobId job id
-     * @return check result
+     * @return check job id
      */
-    Optional<Boolean> getCheckLatestResult(String jobId);
+    String getCheckLatestJobId(String jobId);
     
     /**
-     * Persist check latest detailed result.
+     * Persist check latest result.
      *
      * @param jobId job id
-     * @param checkDetailedSuccess check detailed success
+     * @param checkJobId check job id
      */
-    void persistCheckLatestDetailedResult(String jobId, String checkDetailedSuccess);
+    void persistCheckLatestJobId(String jobId, String checkJobId);
     
     /**
-     * Get check latest detailed result.
+     * Get check job result.
      *
      * @param jobId job id
-     * @return check detailed result
+     * @param checkJobId check job id
+     * @return check job result
      */
-    Optional<String> getCheckLatestDetailedResult(String jobId);
+    Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId, String checkJobId);
     
     /**
-     * Persist check job id.
+     * Persist check latest detailed result.
      *
      * @param jobId job id
      * @param checkJobId check job id
+     * @param dataConsistencyCheckResult check result
      */
-    void persistCheckJobId(String jobId, String checkJobId);
+    void persistCheckJobResult(String jobId, String checkJobId, Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult);
     
     /**
      * List check job ids.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index d0be204a7d0..31ce5d6a086 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
@@ -178,7 +177,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         String jobId = jobConfig.getJobId();
         Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
         log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
         return result;
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 7a2f7f141a8..3af215a37b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -17,19 +17,26 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
-import com.google.common.base.Strings;
-import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.yaml.YamlDataConsistencyCheckResultSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Optional;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
 /**
@@ -57,32 +64,44 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     }
     
     @Override
-    public void persistCheckLatestResult(final String jobId, final boolean checkSuccess) {
-        log.info("persist check latest result '{}' for job {}", checkSuccess, jobId);
-        repository.persist(PipelineMetaDataNode.getCheckLatestResultPath(jobId), String.valueOf(checkSuccess));
+    public String getCheckLatestJobId(final String jobId) {
+        return repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId));
     }
     
     @Override
-    public Optional<Boolean> getCheckLatestResult(final String jobId) {
-        String data = repository.get(PipelineMetaDataNode.getCheckLatestResultPath(jobId));
-        return Strings.isNullOrEmpty(data) ? Optional.empty() : Optional.of(Boolean.parseBoolean(data));
+    public void persistCheckLatestJobId(final String jobId, final String checkJobId) {
+        log.info("persist check job id '{}' for job {}", checkJobId, jobId);
+        repository.persist(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), String.valueOf(checkJobId));
     }
     
+    @SuppressWarnings("unchecked")
     @Override
-    public void persistCheckLatestDetailedResult(final String jobId, @NonNull final String checkDetailedSuccess) {
-        log.info("persist check latest detailed result, jobId={}, checkDetailedSuccess={}", jobId, checkDetailedSuccess);
-        repository.persist(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId), checkDetailedSuccess);
+    public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String jobId, final String checkJobId) {
+        Map<String, DataConsistencyCheckResult> result = new HashMap<>();
+        String checkJobText = repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+        if (StringUtils.isBlank(checkJobText)) {
+            return Collections.emptyMap();
+        }
+        Map<String, String> checkJobConfigMap = YamlEngine.unmarshal(checkJobText, Map.class, true);
+        for (Entry<String, String> entry : checkJobConfigMap.entrySet()) {
+            result.put(entry.getKey(), YamlDataConsistencyCheckResultSwapper.swapToObject(entry.getValue()));
+        }
+        return result;
     }
     
     @Override
-    public Optional<String> getCheckLatestDetailedResult(final String jobId) {
-        return Optional.ofNullable(repository.get(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId)));
-    }
-    
-    @Override
-    public void persistCheckJobId(final String jobId, final String checkJobId) {
-        log.info("persist check job id, jobId={}, checkJobId={}", jobId, checkJobId);
-        repository.persist(PipelineMetaDataNode.getCheckJobIdPath(jobId, checkJobId), "");
+    public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult) {
+        if (null == dataConsistencyCheckResult) {
+            log.warn("data consistency check is null, jobId {}, checkJobId {}", jobId, checkJobId);
+            return;
+        }
+        log.info("persist check job result '{}' for job {}", checkJobId, jobId);
+        Map<String, String> checkResultMap = new LinkedHashMap<>();
+        for (Entry<String, DataConsistencyCheckResult> entry : dataConsistencyCheckResult.entrySet()) {
+            YamlDataConsistencyCheckResult checkResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
+            checkResultMap.put(entry.getKey(), YamlEngine.marshal(checkResult));
+        }
+        repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId), YamlEngine.marshal(checkResultMap));
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
index a73e812ac52..1a5c7cf30dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpe
  */
 public final class PipelineJobExecutionException extends PipelineSQLException {
     
-    private static final long serialVersionUID = -5530453461378051166L;
+    private static final long serialVersionUID = -8462847591661221914L;
     
     public PipelineJobExecutionException(final String taskId, final Throwable cause) {
         super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed", taskId, cause.getMessage());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
similarity index 73%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
index a73e812ac52..dff0e6a1905 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
@@ -21,13 +21,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Pipeline job execution exception.
+ * Pipeline job has already existed exception.
  */
-public final class PipelineJobExecutionException extends PipelineSQLException {
+public final class PipelineJobHasAlreadyExistedException extends PipelineSQLException {
     
-    private static final long serialVersionUID = -5530453461378051166L;
+    private static final long serialVersionUID = 2854259384634892428L;
     
-    public PipelineJobExecutionException(final String taskId, final Throwable cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed", taskId, cause.getMessage());
+    public PipelineJobHasAlreadyExistedException(final String jobId) {
+        super(XOpenSQLState.GENERAL_ERROR, 86, "Job `%s` has already existed", jobId);
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
similarity index 72%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index 47227004a30..05dd63bb70c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -15,12 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import lombok.Data;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
- * Show migration source resources statement.
+ * Yaml data check job progress.
  */
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+@Data
+public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
+    
+    private String status;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
new file mode 100644
index 00000000000..8796af3c6d8
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML data check job progress swapper.
+ */
+public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobProgress, ConsistencyCheckJobProgress> {
+    
+    @Override
+    public YamlConsistencyCheckJobProgress swapToYamlConfiguration(final ConsistencyCheckJobProgress data) {
+        YamlConsistencyCheckJobProgress result = new YamlConsistencyCheckJobProgress();
+        result.setStatus(data.getStatus().name());
+        return result;
+    }
+    
+    @Override
+    public ConsistencyCheckJobProgress swapToObject(final YamlConsistencyCheckJobProgress yamlConfig) {
+        ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
+        result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
+        return result;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 3d459741913..3508ac9b71d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -30,7 +30,7 @@ import java.util.regex.Pattern;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineMetaDataNode {
     
-    private static final String JOB_PATTERN_PREFIX = DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-f]+)";
+    private static final String JOB_PATTERN_PREFIX = DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)";
     
     public static final Pattern CONFIG_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/config");
     
@@ -120,23 +120,24 @@ public final class PipelineMetaDataNode {
     }
     
     /**
-     * Get check latest result path.
+     * Get check latest detailed result path.
      *
      * @param jobId job id
-     * @return check latest result path
+     * @return check latest job id path
      */
-    public static String getCheckLatestResultPath(final String jobId) {
-        return String.join("/", getJobRootPath(jobId), "check", "latest_result");
+    public static String getCheckLatestJobIdPath(final String jobId) {
+        return String.join("/", getJobRootPath(jobId), "check", "latest_job_id");
     }
     
     /**
-     * Get check latest detailed result path.
+     * Get check latest result path.
      *
      * @param jobId job id
-     * @return check latest detailed result path
+     * @param checkJobId check job id
+     * @return check latest result path
      */
-    public static String getCheckLatestDetailedResultPath(final String jobId) {
-        return String.join("/", getJobRootPath(jobId), "check", "latest_detailed_result");
+    public static String getCheckJobResultPath(final String jobId, final String checkJobId) {
+        return String.join("/", getCheckJobIdsRootPath(jobId), checkJobId);
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
new file mode 100644
index 00000000000..ea606b8e296
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consistency check job configuration changed processor.
+ */
+@Slf4j
+public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
+    
+    @Override
+    public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
+        String jobId = jobConfigPOJO.getJobName();
+        if (jobConfigPOJO.isDisabled()) {
+            log.info("{} is disabled", jobId);
+            PipelineJobCenter.stop(jobId);
+            return;
+        }
+        switch (eventType) {
+            case ADDED:
+            case UPDATED:
+                if (PipelineJobCenter.isJobExisting(jobId)) {
+                    log.info("{} added to executing jobs failed since it already exists", jobId);
+                } else {
+                    log.info("{} executing jobs", jobId);
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+                        if (null != throwable) {
+                            log.error("execute failed, jobId={}", jobId, throwable);
+                        }
+                    });
+                }
+                break;
+            case DELETED:
+                log.info("deleted consistency check job id: {}", jobId);
+                PipelineJobCenter.stop(jobId);
+                break;
+            default:
+                break;
+        }
+    }
+    
+    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+        ConsistencyCheckJob job = new ConsistencyCheckJob();
+        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+        job.setOneOffJobBootstrap(oneOffJobBootstrap);
+        oneOffJobBootstrap.execute();
+    }
+    
+    @Override
+    public String getType() {
+        return JobType.CONSISTENCY_CHECK.getTypeName();
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
new file mode 100644
index 00000000000..14cb1c04245
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+
+import java.util.Map;
+
+/**
+ * Consistency check job.
+ */
+@Slf4j
+public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
+    
+    private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+    
+    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
+    
+    @Override
+    public void execute(final ShardingContext shardingContext) {
+        String checkJobId = shardingContext.getJobName();
+        setJobId(checkJobId);
+        ConsistencyCheckJobConfiguration consistencyCheckJobConfig = YamlConsistencyCheckJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
+        ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, JobStatus.FINISHED);
+        jobAPI.persistJobItemProgress(jobItemContext);
+        String referredJobId = consistencyCheckJobConfig.getReferredJobId();
+        log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, referredJobId);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(referredJobId, checkJobId);
+        JobType jobType = PipelineJobIdUtils.parseJobType(referredJobId);
+        InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
+        Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
+        if (StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
+            dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId);
+        } else {
+            dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+        }
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(referredJobId, checkJobId, dataConsistencyCheckResult);
+        jobItemContext.setStatus(JobStatus.FINISHED);
+        jobAPI.persistJobItemProgress(jobItemContext);
+        jobAPI.stop(checkJobId);
+        log.info("execute consistency check job finished, job id:{}, referred job id:{}", checkJobId, referredJobId);
+    }
+    
+    @Override
+    public void stop() {
+        setStopping(true);
+        if (null != getOneOffJobBootstrap()) {
+            getOneOffJobBootstrap().shutdown();
+        }
+        if (null == getJobId()) {
+            log.info("stop consistency check job, jobId is null, ignore");
+            return;
+        }
+        String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
+        pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
similarity index 64%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
index 47227004a30..dca95957b5c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationSourceResourcesStatement.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
 
 /**
- * Show migration source resources statement.
+ * Consistency check job API.
  */
-public class ShowMigrationSourceResourcesStatement extends QueryableScalingRALStatement {
+public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI, PipelineJobItemAPI {
+    
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
similarity index 55%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
index a73e812ac52..858d79a60b8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
@@ -15,19 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.exception.job;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 
 /**
- * Pipeline job execution exception.
+ * Consistency check job API factory.
  */
-public final class PipelineJobExecutionException extends PipelineSQLException {
+public final class ConsistencyCheckJobAPIFactory {
     
-    private static final long serialVersionUID = -5530453461378051166L;
+    static {
+        ShardingSphereServiceLoader.register(ConsistencyCheckJobAPI.class);
+    }
     
-    public PipelineJobExecutionException(final String taskId, final Throwable cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 94, "Task `%s` execute failed", taskId, cause.getMessage());
+    /**
+     * Get instance of migration job API.
+     *
+     * @return got instance
+     */
+    public static ConsistencyCheckJobAPI getInstance() {
+        return RequiredSPIRegistry.getRegisteredService(ConsistencyCheckJobAPI.class);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
new file mode 100644
index 00000000000..1512158497c
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlConsistencyCheckJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Consistency check job API impl.
+ */
+@Slf4j
+public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl implements ConsistencyCheckJobAPI {
+    
+    private static final YamlConsistencyCheckJobProgressSwapper PROGRESS_SWAPPER = new YamlConsistencyCheckJobProgressSwapper();
+    
+    @Override
+    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+        ConsistencyCheckJobId jobId = (ConsistencyCheckJobId) pipelineJobId;
+        return jobId.getPipelineJobId() + jobId.getSequence();
+    }
+    
+    @Override
+    public String createJobAndStart(final CreateConsistencyCheckJobParameter parameter) {
+        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
+        String checkLatestJobId = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
+        if (StringUtils.isNotBlank(checkLatestJobId)) {
+            PipelineJobItemProgress progress = getJobItemProgress(checkLatestJobId, 0);
+            if (null != progress && JobStatus.FINISHED != progress.getStatus()) {
+                log.info("check job already existed and status isn't FINISHED, status {}", progress.getStatus());
+                throw new PipelineJobHasAlreadyExistedException(checkLatestJobId);
+            }
+        }
+        int consistencyCheckVersionNew = null == checkLatestJobId ? 0 : ConsistencyCheckJobId.getSequence(checkLatestJobId) + 1;
+        YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
+        ConsistencyCheckJobId checkJobId = new ConsistencyCheckJobId(parameter.getJobId(), consistencyCheckVersionNew);
+        String result = marshalJobId(checkJobId);
+        yamlConfig.setJobId(result);
+        yamlConfig.setParentJobId(parameter.getJobId());
+        yamlConfig.setAlgorithmTypeName(parameter.getAlgorithmTypeName());
+        yamlConfig.setAlgorithmProperties(parameter.getAlgorithmProps());
+        ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig);
+        start(jobConfig);
+        return result;
+    }
+    
+    @Override
+    public Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(final String jobId) {
+        String checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
+        if (StringUtils.isBlank(checkLatestJobId)) {
+            return Collections.emptyMap();
+        }
+        return PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(jobId, checkLatestJobId);
+    }
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+        ConsistencyCheckJobProgress jobProgress = new ConsistencyCheckJobProgress();
+        jobProgress.setStatus(jobItemContext.getStatus());
+        YamlConsistencyCheckJobProgress yamlJobProgress = PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
+    }
+    
+    @Override
+    public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+        String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+        if (StringUtils.isBlank(progress)) {
+            return null;
+        }
+        ConsistencyCheckJobProgress jobProgress = PROGRESS_SWAPPER.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
+        ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
+        result.setStatus(jobProgress.getStatus());
+        return result;
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
+        ConsistencyCheckJobProgress jobProgress = (ConsistencyCheckJobProgress) getJobItemProgress(jobId, shardingItem);
+        if (null == jobProgress) {
+            log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        jobProgress.setStatus(status);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress)));
+    }
+    
+    @Override
+    public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
+        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+    }
+    
+    @Override
+    protected ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
+        return YamlConsistencyCheckJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
+    }
+    
+    @Override
+    protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
+        return new YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration((ConsistencyCheckJobConfiguration) jobConfig);
+    }
+    
+    @Override
+    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+    }
+    
+    @Override
+    public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+        return null;
+    }
+    
+    @Override
+    public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
+        return null;
+    }
+    
+    @Override
+    protected PipelineJobInfo getJobInfo(final String jobId) {
+        return null;
+    }
+    
+    @Override
+    protected String getJobClassName() {
+        return ConsistencyCheckJob.class.getName();
+    }
+    
+    @Override
+    public JobType getJobType() {
+        return JobType.CONSISTENCY_CHECK;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index c5bc8680f5c..623feb8469c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -23,9 +23,6 @@ import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
 
-import java.time.Instant;
-import java.time.format.DateTimeFormatter;
-
 /**
  * Consistency check job id.
  */
@@ -35,15 +32,30 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
     
     private final String pipelineJobId;
     
-    private final String createTimeMinutes;
+    private final int sequence;
     
-    public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final long createTimeMillis) {
+    public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final int sequence) {
         super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
         this.pipelineJobId = pipelineJobId;
-        this.createTimeMinutes = DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(createTimeMillis));
+        if (sequence > MAX_CONSISTENCY_CHECK_VERSION) {
+            this.sequence = 0;
+        } else {
+            this.sequence = sequence;
+        }
+    }
+    
+    /**
+     * Get consistency check version.
+     *
+     * @param consistencyCheckJobId consistency check job id.
+     * @return sequence
+     */
+    public static int getSequence(final @NonNull String consistencyCheckJobId) {
+        String versionString = consistencyCheckJobId.substring(consistencyCheckJobId.length() - 1);
+        return Integer.parseInt(versionString);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
new file mode 100644
index 00000000000..b3471ac39b4
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+
+/**
+ * Consistency check job item context.
+ */
+@Getter
+@Setter
+@Slf4j
+public final class ConsistencyCheckJobItemContext implements PipelineJobItemContext {
+    
+    private final String jobId;
+    
+    private final int shardingItem;
+    
+    private String dataSourceName;
+    
+    private volatile boolean stopping;
+    
+    private volatile JobStatus status;
+    
+    private final ConsistencyCheckJobConfiguration jobConfig;
+    
+    public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final JobStatus status) {
+        this.jobConfig = jobConfig;
+        jobId = jobConfig.getJobId();
+        this.shardingItem = shardingItem;
+        this.status = status;
+    }
+    
+    @Override
+    public PipelineProcessContext getJobProcessContext() {
+        throw new UnsupportedOperationException("");
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
similarity index 89%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
index 90857b277c4..4c63097dd84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
index 90857b277c4..141ea167a06 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
@@ -16,3 +16,4 @@
 #
 
 org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
index 89ef92e051e..2f4035228af 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
@@ -16,3 +16,4 @@
 #
 
 org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationChangedJobConfigurationProcessor
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckChangedJobConfigurationProcessor
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
similarity index 89%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
index 90857b277c4..4c63097dd84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
index 28683d1da23..a0681f81d4a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -71,13 +71,13 @@ public final class PipelineMetaDataNodeTest {
     }
     
     @Test
-    public void assertGetCheckLatestResultPath() {
-        assertThat(PipelineMetaDataNode.getCheckLatestResultPath(jobId), is(jobCheckRootPath + "/latest_result"));
+    public void assertGetCheckLatestJobIdPath() {
+        assertThat(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId), is(jobCheckRootPath + "/latest_job_id"));
     }
     
     @Test
-    public void assertGetCheckLatestDetailedResultPath() {
-        assertThat(PipelineMetaDataNode.getCheckLatestDetailedResultPath(jobId), is(jobCheckRootPath + "/latest_detailed_result"));
+    public void assertGetCheckJobResultPath() {
+        assertThat(PipelineMetaDataNode.getCheckJobResultPath(jobId, "j02fx123"), is(jobCheckRootPath + "/job_ids/j02fx123"));
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 445fe1a5492..b46a70d537c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.integration.data.pipeline.cases.migration;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
 import org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
 import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
@@ -28,9 +29,11 @@ import org.opengauss.util.PSQLException;
 
 import javax.xml.bind.JAXB;
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -159,10 +162,20 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
         return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
-    protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) {
-        List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType));
+    protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) throws SQLException {
+        proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
+        List<Map<String, Object>> checkJobResults = Collections.emptyList();
+        for (int i = 0; i < 10; i++) {
+            checkJobResults = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
+            if (null != checkJobResults && !checkJobResults.isEmpty()) {
+                break;
+            }
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
+        }
+        assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
         log.info("check job results: {}", checkJobResults);
         for (Map<String, Object> entry : checkJobResults) {
+            assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
             assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
         }
     }
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java
index 9e1bdfa93e5..0f62fe8f217 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/QueryableScalingRALStatementAssert.java
@@ -20,20 +20,20 @@ package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statemen
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
-import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceResourcesStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.CheckMigrationStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationCheckStatusStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationCheckAlgorithmsStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationListStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationSourceResourceStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query.ShowMigrationStatusStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.SQLParserTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ShowMigrationListStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckStatusStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckAlgorithmsStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationSourceResourcesStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationStatusStatementTestCase;
@@ -57,8 +57,8 @@ public final class QueryableScalingRALStatementAssert {
             ShowMigrationListStatementAssert.assertIs(assertContext, (ShowMigrationListStatement) actual, (ShowMigrationListStatementTestCase) expected);
         } else if (actual instanceof ShowMigrationCheckAlgorithmsStatement) {
             ShowMigrationCheckAlgorithmsStatementAssert.assertIs(assertContext, (ShowMigrationCheckAlgorithmsStatement) actual, (ShowMigrationCheckAlgorithmsStatementTestCase) expected);
-        } else if (actual instanceof CheckMigrationStatement) {
-            CheckMigrationStatementAssert.assertIs(assertContext, (CheckMigrationStatement) actual, (CheckMigrationStatementTestCase) expected);
+        } else if (actual instanceof ShowMigrationCheckStatusStatement) {
+            ShowMigrationCheckStatusStatementAssert.assertIs(assertContext, (ShowMigrationCheckStatusStatement) actual, (ShowMigrationCheckStatusStatementTestCase) expected);
         } else if (actual instanceof ShowMigrationStatusStatement) {
             ShowMigrationStatusStatementAssert.assertIs(assertContext, (ShowMigrationStatusStatement) actual, (ShowMigrationStatusStatementTestCase) expected);
         } else if (actual instanceof ShowMigrationSourceResourcesStatement) {
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java
index 808eac96ac6..e1dd898baad 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/UpdatableScalingRALStatementAssert.java
@@ -22,28 +22,31 @@ import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
 import org.apache.shardingsphere.distsql.parser.statement.ral.updatable.DropPipelineProcessConfigurationStatement;
 import org.apache.shardingsphere.migration.distsql.statement.AddMigrationSourceResourceStatement;
-import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
 import org.apache.shardingsphere.migration.distsql.statement.DropMigrationSourceResourceStatement;
 import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
-import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
 import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
 import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.SQLCaseAssertContext;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.CheckMigrationStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.AddMigrationSourceResourceStatementAssert;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.RollbackMigrationStatementAssert;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.DropPipelineProcessConfigurationStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.CommitMigrationStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.DropMigrationSourceResourceStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.DropPipelineProcessConfigurationStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.MigrateTableStatementAssert;
-import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.CommitMigrationStatementAssert;
+import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.RollbackMigrationStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.StartMigrationStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update.StopMigrationStatementAssert;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.SQLParserTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.AddMigrationSourceResourceStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropMigrationSourceResourceStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.MigrateTableStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.StartMigrationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.StopMigrationStatementTestCase;
 
@@ -79,6 +82,8 @@ public final class UpdatableScalingRALStatementAssert {
             AddMigrationSourceResourceStatementAssert.assertIs(assertContext, (AddMigrationSourceResourceStatement) actual, (AddMigrationSourceResourceStatementTestCase) expected);
         } else if (actual instanceof DropMigrationSourceResourceStatement) {
             DropMigrationSourceResourceStatementAssert.assertIs(assertContext, (DropMigrationSourceResourceStatement) actual, (DropMigrationSourceResourceStatementTestCase) expected);
+        } else if (actual instanceof CheckMigrationStatement) {
+            CheckMigrationStatementAssert.assertIs(assertContext, (CheckMigrationStatement) actual, (CheckMigrationStatementTestCase) expected);
         }
     }
 }
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/ShowMigrationCheckStatusStatementAssert.java
similarity index 64%
copy from shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
copy to shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/ShowMigrationCheckStatusStatementAssert.java
index d485086a5ca..f6ca28bf698 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/ShowMigrationCheckStatusStatementAssert.java
@@ -17,38 +17,35 @@
 
 package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query;
 
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
 import org.apache.shardingsphere.test.sql.parser.parameterized.asserts.SQLCaseAssertContext;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.segment.impl.distsql.ExpectedAlgorithm;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckStatusStatementTestCase;
 
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
- * Check migration statement assert.
+ * Show migration check status statement assert.
  */
-public final class CheckMigrationStatementAssert {
+public final class ShowMigrationCheckStatusStatementAssert {
     
     /**
-     * Assert check migration statement is correct with expected parser result.
+     * Assert migration check status statement is correct with expected parser result.
      *
      * @param assertContext assert context
      * @param actual actual check migration statement
      * @param expected expected check migration statement test case
      */
-    public static void assertIs(final SQLCaseAssertContext assertContext, final CheckMigrationStatement actual, final CheckMigrationStatementTestCase expected) {
+    public static void assertIs(final SQLCaseAssertContext assertContext, final ShowMigrationCheckStatusStatement actual, final ShowMigrationCheckStatusStatementTestCase expected) {
         if (null == expected) {
             assertNull(assertContext.getText("Actual statement should not exist."), actual);
         } else {
             assertNotNull(assertContext.getText("Actual statement should exist."), actual);
             assertJobIds(assertContext, actual.getJobId(), expected.getJobIds());
-            assertTypeStrategy(assertContext, actual.getTypeStrategy(), expected.getTableStrategies());
         }
     }
     
@@ -60,13 +57,4 @@ public final class CheckMigrationStatementAssert {
             assertThat(assertContext.getText("Job id assertion error"), actual, is(expected.iterator().next()));
         }
     }
-    
-    private static void assertTypeStrategy(final SQLCaseAssertContext assertContext, final AlgorithmSegment actual, final List<ExpectedAlgorithm> expected) {
-        if (expected.isEmpty()) {
-            assertNull(assertContext.getText("Actual type strategy should not exist."), actual);
-        } else {
-            assertNotNull(assertContext.getText("Actual type strategy should exist."), actual);
-            assertThat(assertContext.getText("Type strategy assertion error"), actual.getName(), is(expected.iterator().next().getName()));
-        }
-    }
 }
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/update/CheckMigrationStatementAssert.java
similarity index 98%
rename from shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
rename to shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/update/CheckMigrationStatementAssert.java
index d485086a5ca..558e88b04f6 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/query/CheckMigrationStatementAssert.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/distsql/ral/impl/migration/update/CheckMigrationStatementAssert.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.query;
+package org.apache.shardingsphere.test.sql.parser.parameterized.asserts.statement.distsql.ral.impl.migration.update;
 
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
index cbf4cb1fec5..d8eba88a095 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
@@ -286,11 +286,11 @@ import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ClearHintStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ClearReadwriteSplittingHintStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ClearShardingHintStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ConvertYamlConfigurationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.CreateTrafficRuleStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.DiscardDistSQLStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.DropTrafficRuleStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ExportDatabaseConfigurationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ConvertYamlConfigurationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.ImportDatabaseConfigurationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.LabelInstanceStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.PrepareDistSQLStatementTestCase;
@@ -316,11 +316,12 @@ import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.UnlabelInstanceStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.AddMigrationSourceResourceStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CheckMigrationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropMigrationSourceResourceStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.DropPipelineProcessConfigurationStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.MigrateTableStatementTestCase;
-import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.CommitMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.RollbackMigrationStatementTestCase;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckStatusStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationCheckAlgorithmsStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationSourceResourcesStatementTestCase;
 import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration.ShowMigrationStatusStatementTestCase;
@@ -1015,6 +1016,9 @@ public final class SQLParserTestCases {
     @XmlElement(name = "show-migration-status")
     private final List<ShowMigrationStatusStatementTestCase> showMigrationStatusTestCases = new LinkedList<>();
     
+    @XmlElement(name = "show-migration-check-status")
+    private final List<ShowMigrationCheckStatusStatementTestCase> showMigrationCheckStatusTestCases = new LinkedList<>();
+    
     @XmlElement(name = "show-migration-check-algorithms")
     private final List<ShowMigrationCheckAlgorithmsStatementTestCase> showMigrationCheckAlgorithmsStatementTestCases = new LinkedList<>();
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/statement/distsql/ral/migration/ShowMigrationCheckStatusStatementTestCase.java
similarity index 60%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
copy to shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/statement/distsql/ral/migration/ShowMigrationCheckStatusStatementTestCase.java
index d2bbeaa772f..2db4fa294b7 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/CheckMigrationStatement.java
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/statement/distsql/ral/migration/ShowMigrationCheckStatusStatementTestCase.java
@@ -15,21 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.migration.distsql.statement;
+package org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.distsql.ral.migration;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
-import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
+import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain.statement.SQLParserTestCase;
+
+import javax.xml.bind.annotation.XmlElement;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
- * Check migration statement.
+ * Check migration statement test case.
  */
-@RequiredArgsConstructor
 @Getter
-public final class CheckMigrationStatement extends QueryableScalingRALStatement {
-    
-    private final String jobId;
+public final class ShowMigrationCheckStatusStatementTestCase extends SQLParserTestCase {
     
-    private final AlgorithmSegment typeStrategy;
+    @XmlElement(name = "job-id")
+    private final List<String> jobIds = new LinkedList<>();
 }
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml
index 77aae07035d..36d8995dd40 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/case/ral/migration.xml
@@ -38,6 +38,10 @@
         <job-id>123</job-id>
     </show-migration-status>
     
+    <show-migration-check-status sql-case-id="show-migration-check-status">
+        <job-id>123</job-id>
+    </show-migration-check-status>
+    
     <drop-migration-process-configuration sql-case-id="drop-migration-process-configuration-read">
         <conf-path>/READ</conf-path>
     </drop-migration-process-configuration>
diff --git a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml
index 5586573b7ac..39aa309a54c 100644
--- a/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml
+++ b/shardingsphere-test/shardingsphere-parser-test/src/main/resources/sql/supported/ral/migration.xml
@@ -22,6 +22,7 @@
     <distsql-case id="show-migration-check-algorithms" value="SHOW MIGRATION CHECK ALGORITHMS;" />
     <distsql-case id="check-migration" value="CHECK MIGRATION 123;" />
     <distsql-case id="show-migration-status" value="SHOW MIGRATION STATUS 123;" />
+    <distsql-case id="show-migration-check-status" value="SHOW MIGRATION CHECK STATUS 123;" />
     <distsql-case id="check-migration-with-type" value="CHECK MIGRATION 123 by TYPE(name='DEFAULT', PROPERTIES('test-property'='4'));" />
     
     <distsql-case id="drop-migration-process-configuration-read" value="DROP MIGRATION PROCESS CONFIGURATION '/READ';" />
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
index 649fae2ed52..136871be3c2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
 import org.junit.Test;
 
@@ -31,4 +32,9 @@ public final class PipelineAPIFactoryTest {
     public void assertGetPipelineJobAPI() {
         assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), instanceOf(MigrationJobAPIImpl.class));
     }
+    
+    @Test
+    public void assertGetConsistencyCheckJobAPI() {
+        assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckJobAPIImpl.class));
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index 912a9ebf2c2..21d62ecb150 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
 import org.junit.Test;
 
@@ -43,4 +44,9 @@ public final class PipelineJobPublicAPIFactoryTest {
     public void assertGetMigrationJobPublicAPI() {
         assertThat(PipelineJobPublicAPIFactory.getMigrationJobPublicAPI(), instanceOf(MigrationJobAPIImpl.class));
     }
+    
+    @Test
+    public void assertGetConsistencyCheckJobPublicAPI() {
+        assertThat(PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI(), instanceOf(ConsistencyCheckJobAPIImpl.class));
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index af4e96eb5a1..d435b88413f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.api.impl;
 
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -40,18 +43,19 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.sql.Types;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Optional;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -76,9 +80,12 @@ public final class GovernanceRepositoryAPIImplTest {
     @Test
     public void assertPersistJobCheckResult() {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
-        governanceRepositoryAPI.persistCheckLatestResult(jobItemContext.getJobId(), true);
-        Optional<Boolean> checkResult = governanceRepositoryAPI.getCheckLatestResult(jobItemContext.getJobId());
-        assertTrue(checkResult.isPresent() && checkResult.get());
+        Map<String, DataConsistencyCheckResult> actual = new HashMap<>();
+        actual.put("test", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1), new DataConsistencyContentCheckResult(true)));
+        governanceRepositoryAPI.persistCheckJobResult(jobItemContext.getJobId(), "j02123", actual);
+        Map<String, DataConsistencyCheckResult> checkResult = governanceRepositoryAPI.getCheckJobResult(jobItemContext.getJobId(), "j02123");
+        assertThat(checkResult.size(), is(1));
+        assertTrue(checkResult.get("test").getContentCheckResult().isMatched());
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
new file mode 100644
index 00000000000..65907b3c29a
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class ConsistencyCheckJobAPIImplTest {
+    
+    private static ConsistencyCheckJobAPI jobAPI;
+    
+    @BeforeClass
+    public static void beforeClass() {
+        PipelineContextUtil.mockModeConfigAndContextManager();
+        jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+    }
+    
+    @Test
+    public void assertCreateJobConfig() {
+        String migrationJobId = "j0101test";
+        CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(migrationJobId, null, null);
+        String checkJobId = jobAPI.createJobAndStart(parameter);
+        ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) jobAPI.getJobConfiguration(checkJobId);
+        String expectCheckJobId = "j0201j0101test0";
+        assertThat(jobConfig.getJobId(), is(expectCheckJobId));
+        assertNull(jobConfig.getAlgorithmTypeName());
+        int consistencyCheckVersion = ConsistencyCheckJobId.getSequence(expectCheckJobId);
+        assertThat(consistencyCheckVersion, is(0));
+    }
+}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 9770d830787..0f240d13966 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -31,8 +31,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -63,10 +61,10 @@ import java.util.Objects;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 @Slf4j
@@ -222,10 +220,8 @@ public final class MigrationJobAPIImplTest {
         final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
         Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
-        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
         MigrationJobItemContext jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         jobAPI.persistJobItemProgress(jobItemContext);
-        repositoryAPI.persistCheckLatestResult(jobId.get(), true);
         jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
         Map<Integer, InventoryIncrementalJobItemProgress> progress = jobAPI.getJobProgress(jobId.get());
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : progress.entrySet()) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
similarity index 66%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
copy to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
index 649fae2ed52..a8cf8eeda01 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-public final class PipelineAPIFactoryTest {
+public final class ConsistencyCheckJobAPIFactoryTest {
     
     @Test
-    public void assertGetPipelineJobAPI() {
-        assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), instanceOf(MigrationJobAPIImpl.class));
+    public void assertGetInstance() {
+        assertThat(ConsistencyCheckJobAPIFactory.getInstance(), instanceOf(ConsistencyCheckJobAPIImpl.class));
     }
 }