You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/27 11:38:21 UTC

[shardingsphere] branch master updated: Add stop/start migration check DistSQL and implement (#21222)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e8e55f76470 Add stop/start migration check DistSQL and implement (#21222)
e8e55f76470 is described below

commit e8e55f76470cde4fa21c8c79e88f32a21dcded5b
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Sep 27 19:38:08 2022 +0800

    Add stop/start migration check DistSQL and implement (#21222)
---
 .../handler/update/StartMigrationCheckUpdater.java | 41 ++++++++++++++++++
 .../handler/update/StopMigrationCheckUpdater.java  | 41 ++++++++++++++++++
 ....shardingsphere.infra.distsql.update.RALUpdater |  2 +
 .../main/antlr4/imports/migration/RALStatement.g4  |  8 ++++
 .../parser/autogen/MigrationDistSQLStatement.g4    |  2 +
 .../core/MigrationDistSQLStatementVisitor.java     | 16 ++++++-
 .../statement/StartMigrationCheckStatement.java}   | 31 ++++++--------
 .../statement/StopMigrationCheckStatement.java}    | 31 ++++++--------
 .../pipeline/api/ConsistencyCheckJobPublicAPI.java | 14 ++++++
 .../job/ConsistencyCheckJobConfiguration.java      |  2 +-
 ...amlConsistencyCheckJobConfigurationSwapper.java |  2 +-
 .../data/pipeline/api/job/JobStatus.java           |  5 +++
 .../pipeline/core/api/GovernanceRepositoryAPI.java |  3 +-
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |  5 ++-
 .../PipelineJobHasAlreadyFinishedException.java    | 32 ++++++--------
 .../consistencycheck/ConsistencyCheckJob.java      | 36 ++++++++++------
 .../ConsistencyCheckJobAPIImpl.java                | 50 ++++++++++++++++++----
 .../scenario/migration/MigrationJobAPIImpl.java    | 10 +++++
 .../atomic/storage/impl/OpenGaussContainer.java    |  2 +-
 .../data/pipeline/cases/base/BaseITCase.java       |  3 +-
 .../api/impl/ConsistencyCheckJobAPIImplTest.java   | 38 ++++++++++++++--
 21 files changed, 287 insertions(+), 87 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationCheckUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationCheckUpdater.java
new file mode 100644
index 00000000000..365d5256b37
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationCheckUpdater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.migration.distsql.handler.update;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheckStatement;
+
+/**
+ * Start migration check updater.
+ */
+public final class StartMigrationCheckUpdater implements RALUpdater<StartMigrationCheckStatement> {
+    
+    private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
+    
+    @Override
+    public void executeUpdate(final String databaseName, final StartMigrationCheckStatement sqlStatement) {
+        JOB_API.startByParentJobId(sqlStatement.getJobId());
+    }
+    
+    @Override
+    public String getType() {
+        return StartMigrationCheckStatement.class.getName();
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java
new file mode 100644
index 00000000000..0793ac26329
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.migration.distsql.handler.update;
+
+import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
+import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement;
+
+/**
+ * Stop migration check updater.
+ */
+public final class StopMigrationCheckUpdater implements RALUpdater<StopMigrationCheckStatement> {
+    
+    private static final ConsistencyCheckJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getConsistencyCheckJobPublicAPI();
+    
+    @Override
+    public void executeUpdate(final String databaseName, final StopMigrationCheckStatement sqlStatement) {
+        JOB_API.stopByParentJobId(sqlStatement.getJobId());
+    }
+    
+    @Override
+    public String getType() {
+        return StopMigrationCheckStatement.class.getName();
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
index 0c026e4a415..d82c8b27b18 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.distsql.update.RALUpdater
@@ -23,3 +23,5 @@ org.apache.shardingsphere.migration.distsql.handler.update.RollbackMigrationUpda
 org.apache.shardingsphere.migration.distsql.handler.update.AddMigrationSourceResourceUpdater
 org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationSourceResourceUpdater
 org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpdater
+org.apache.shardingsphere.migration.distsql.handler.update.StartMigrationCheckUpdater
+org.apache.shardingsphere.migration.distsql.handler.update.StopMigrationCheckUpdater
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
index e595c00fcc4..bd5925cc889 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/migration/RALStatement.g4
@@ -55,6 +55,14 @@ showMigrationCheckStatus
     : SHOW MIGRATION CHECK STATUS jobId
     ;
 
+stopMigrationCheck
+    : STOP MIGRATION CHECK jobId
+    ;
+
+startMigrationCheck
+    : START MIGRATION CHECK jobId
+    ;
+
 showMigrationCheckAlgorithms
     : SHOW MIGRATION CHECK ALGORITHMS
     ;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
index 40237cd1699..e203d02f9b7 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
@@ -33,5 +33,7 @@ execute
     | dropMigrationSourceResource
     | showMigrationSourceResources
     | showMigrationCheckStatus
+    | startMigrationCheck
+    | stopMigrationCheck
     ) SEMI?
     ;
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index b7c41e605e1..f76d5d83e58 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -37,7 +37,9 @@ import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatemen
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceResourcesContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
+import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
+import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.distsql.parser.segment.DataSourceSegment;
@@ -49,12 +51,14 @@ import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStat
 import org.apache.shardingsphere.migration.distsql.statement.DropMigrationSourceResourceStatement;
 import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
 import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement;
-import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
+import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceResourcesStatement;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
+import org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheckStatement;
 import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
+import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement;
 import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
 import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
 import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
@@ -197,4 +201,14 @@ public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStat
     public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
         return new ShowMigrationCheckStatusStatement(getIdentifierValue(ctx.jobId()));
     }
+    
+    @Override
+    public ASTNode visitStartMigrationCheck(final StartMigrationCheckContext ctx) {
+        return new StartMigrationCheckStatement(getIdentifierValue(ctx.jobId()));
+    }
+    
+    @Override
+    public ASTNode visitStopMigrationCheck(final StopMigrationCheckContext ctx) {
+        return new StopMigrationCheckStatement(getIdentifierValue(ctx.jobId()));
+    }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StartMigrationCheckStatement.java
similarity index 64%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StartMigrationCheckStatement.java
index 40237cd1699..527eacfc647 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StartMigrationCheckStatement.java
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.migration.distsql.statement;
 
-import Symbol, RALStatement, RQLStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
 
-execute
-    : (showMigrationList
-    | showMigrationStatus
-    | migrateTable
-    | startMigration
-    | stopMigration
-    | rollbackMigration
-    | commitMigration
-    | checkMigration
-    | showMigrationCheckAlgorithms
-    | addMigrationSourceResource
-    | dropMigrationSourceResource
-    | showMigrationSourceResources
-    | showMigrationCheckStatus
-    ) SEMI?
-    ;
+/**
+ * Start migration check statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class StartMigrationCheckStatement extends UpdatableScalingRALStatement {
+    
+    private final String jobId;
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StopMigrationCheckStatement.java
similarity index 64%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StopMigrationCheckStatement.java
index 40237cd1699..3f8b5bf3332 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/StopMigrationCheckStatement.java
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.migration.distsql.statement;
 
-import Symbol, RALStatement, RQLStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
 
-execute
-    : (showMigrationList
-    | showMigrationStatus
-    | migrateTable
-    | startMigration
-    | stopMigration
-    | rollbackMigration
-    | commitMigration
-    | checkMigration
-    | showMigrationCheckAlgorithms
-    | addMigrationSourceResource
-    | dropMigrationSourceResource
-    | showMigrationSourceResources
-    | showMigrationCheckStatus
-    ) SEMI?
-    ;
+/**
+ * Stop migration check statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class StopMigrationCheckStatement extends UpdatableScalingRALStatement {
+    
+    private final String jobId;
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index cb6ed9ac55d..5e226ce1124 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -45,4 +45,18 @@ public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, Requ
      * @return latest data consistency check result
      */
     Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(String jobId);
+    
+    /**
+     * Start by parent job id.
+     *
+     * @param parentJobId parent job id
+     */
+    void startByParentJobId(String parentJobId);
+    
+    /**
+     * Start by parent job id.
+     *
+     * @param parentJobId parent job id
+     */
+    void stopByParentJobId(String parentJobId);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
index 3102c2bbb9d..8576a374cae 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
@@ -35,7 +35,7 @@ public final class ConsistencyCheckJobConfiguration implements PipelineJobConfig
     
     private final String jobId;
     
-    private final String referredJobId;
+    private final String parentJobId;
     
     private final String algorithmTypeName;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
index e7e07cdb678..64ce12e1029 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -32,7 +32,7 @@ public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlCo
     public YamlConsistencyCheckJobConfiguration swapToYamlConfiguration(final ConsistencyCheckJobConfiguration data) {
         YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration();
         result.setJobId(data.getJobId());
-        result.setParentJobId(data.getReferredJobId());
+        result.setParentJobId(data.getParentJobId());
         result.setAlgorithmTypeName(data.getAlgorithmTypeName());
         result.setAlgorithmProperties(data.getAlgorithmProperties());
         return result;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 3dd111050cd..50f7272f2fc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -58,6 +58,11 @@ public enum JobStatus {
      */
     FINISHED(false),
     
+    /**
+     * Consistency check job execute failed.
+     */
+    CONSISTENCY_CHECK_FAILURE(false),
+    
     /**
      * Task has stopped by failing to prepare work.
      */
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index bf47fc38033..edbc645a141 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Governance repository API.
@@ -62,7 +63,7 @@ public interface GovernanceRepositoryAPI {
      * @param jobId job id
      * @return check job id
      */
-    String getCheckLatestJobId(String jobId);
+    Optional<String> getCheckLatestJobId(String jobId);
     
     /**
      * Persist check latest result.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 3af215a37b2..3baf3db9f5f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -64,8 +65,8 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     }
     
     @Override
-    public String getCheckLatestJobId(final String jobId) {
-        return repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId));
+    public Optional<String> getCheckLatestJobId(final String jobId) {
+        return Optional.ofNullable(repository.get(PipelineMetaDataNode.getCheckLatestJobIdPath(jobId)));
     }
     
     @Override
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4 b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
similarity index 56%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index 40237cd1699..c0835346d6d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/migration/org/apache/shardingsphere/distsql/parser/autogen/MigrationDistSQLStatement.g4
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-grammar MigrationDistSQLStatement;
+package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import Symbol, RALStatement, RQLStatement;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
-execute
-    : (showMigrationList
-    | showMigrationStatus
-    | migrateTable
-    | startMigration
-    | stopMigration
-    | rollbackMigration
-    | commitMigration
-    | checkMigration
-    | showMigrationCheckAlgorithms
-    | addMigrationSourceResource
-    | dropMigrationSourceResource
-    | showMigrationSourceResources
-    | showMigrationCheckStatus
-    ) SEMI?
-    ;
+/**
+ * Pipeline job has already finished exception.
+ */
+public final class PipelineJobHasAlreadyFinishedException extends PipelineSQLException {
+    
+    private static final long serialVersionUID = 2854259384634892428L;
+    
+    public PipelineJobHasAlreadyFinishedException(final String message) {
+        super(XOpenSQLState.GENERAL_ERROR, 88, message);
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 14cb1c04245..030e2e4ba0c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -34,7 +34,9 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDa
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -52,24 +54,32 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
         String checkJobId = shardingContext.getJobName();
         setJobId(checkJobId);
         ConsistencyCheckJobConfiguration consistencyCheckJobConfig = YamlConsistencyCheckJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
-        ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, JobStatus.FINISHED);
+        JobStatus status = JobStatus.RUNNING;
+        ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, status);
         jobAPI.persistJobItemProgress(jobItemContext);
-        String referredJobId = consistencyCheckJobConfig.getReferredJobId();
-        log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, referredJobId);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(referredJobId, checkJobId);
-        JobType jobType = PipelineJobIdUtils.parseJobType(referredJobId);
+        String parentJobId = consistencyCheckJobConfig.getParentJobId();
+        log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, parentJobId);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(parentJobId, checkJobId);
+        JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
         InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
-        Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult;
-        if (StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
-            dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId);
-        } else {
-            dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(referredJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+        Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = Collections.emptyMap();
+        try {
+            if (StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())) {
+                dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(parentJobId);
+            } else {
+                dataConsistencyCheckResult = jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+            }
+            status = JobStatus.FINISHED;
+        } catch (final SQLWrapperException ex) {
+            log.error("data consistency check failed", ex);
+            status = JobStatus.CONSISTENCY_CHECK_FAILURE;
+            jobAPI.persistJobItemErrorMessage(checkJobId, 0, ex);
         }
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(referredJobId, checkJobId, dataConsistencyCheckResult);
-        jobItemContext.setStatus(JobStatus.FINISHED);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+        jobItemContext.setStatus(status);
         jobAPI.persistJobItemProgress(jobItemContext);
         jobAPI.stop(checkJobId);
-        log.info("execute consistency check job finished, job id:{}, referred job id:{}", checkJobId, referredJobId);
+        log.info("execute consistency check job finished, job id:{}, parent job id:{}", checkJobId, parentJobId);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 1512158497c..207b0954760 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -40,6 +40,8 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -47,6 +49,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Consistency check job API impl.
@@ -65,15 +68,15 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     @Override
     public String createJobAndStart(final CreateConsistencyCheckJobParameter parameter) {
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        String checkLatestJobId = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
-        if (StringUtils.isNotBlank(checkLatestJobId)) {
-            PipelineJobItemProgress progress = getJobItemProgress(checkLatestJobId, 0);
+        Optional<String> optional = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
+        if (optional.isPresent()) {
+            PipelineJobItemProgress progress = getJobItemProgress(optional.get(), 0);
             if (null != progress && JobStatus.FINISHED != progress.getStatus()) {
                 log.info("check job already existed and status isn't FINISHED, status {}", progress.getStatus());
-                throw new PipelineJobHasAlreadyExistedException(checkLatestJobId);
+                throw new PipelineJobHasAlreadyExistedException(optional.get());
             }
         }
-        int consistencyCheckVersionNew = null == checkLatestJobId ? 0 : ConsistencyCheckJobId.getSequence(checkLatestJobId) + 1;
+        int consistencyCheckVersionNew = optional.map(s -> ConsistencyCheckJobId.getSequence(s) + 1).orElse(0);
         YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
         ConsistencyCheckJobId checkJobId = new ConsistencyCheckJobId(parameter.getJobId(), consistencyCheckVersionNew);
         String result = marshalJobId(checkJobId);
@@ -88,11 +91,11 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     @Override
     public Map<String, DataConsistencyCheckResult> getLatestDataConsistencyCheckResult(final String jobId) {
-        String checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
-        if (StringUtils.isBlank(checkLatestJobId)) {
+        Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
+        if (!checkLatestJobId.isPresent()) {
             return Collections.emptyMap();
         }
-        return PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(jobId, checkLatestJobId);
+        return PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(jobId, checkLatestJobId.get());
     }
     
     @Override
@@ -126,6 +129,37 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(PROGRESS_SWAPPER.swapToYamlConfiguration(jobProgress)));
     }
     
+    @Override
+    public void startDisabledJob(final String jobId) {
+        log.info("start disable check job {}", jobId);
+        PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
+        if (null != jobProgress && JobStatus.FINISHED == jobProgress.getStatus()) {
+            String errorMessage = String.format("job already finished, can use `CHECK MIGRATION '%s'` to start a new data consistency check job", jobId);
+            throw new PipelineJobHasAlreadyFinishedException(errorMessage);
+        }
+        super.startDisabledJob(jobId);
+    }
+    
+    @Override
+    public void startByParentJobId(final String parentJobId) {
+        log.info("start check job by parentJobId {}", parentJobId);
+        Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        if (!optional.isPresent()) {
+            throw new PipelineJobNotFoundException(parentJobId + " check job");
+        }
+        startDisabledJob(optional.get());
+    }
+    
+    @Override
+    public void stopByParentJobId(final String parentJobId) {
+        log.info("stop check job by parentJobId {}", parentJobId);
+        Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        if (!optional.isPresent()) {
+            throw new PipelineJobNotFoundException(parentJobId + " check job");
+        }
+        stop(optional.get());
+    }
+    
     @Override
     public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
         return getJobConfiguration(getElasticJobConfigPOJO(jobId));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 24aac28d387..9d3d1f91e2e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -53,6 +53,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineCol
 import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
@@ -64,6 +65,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSche
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
@@ -97,6 +99,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -406,6 +409,13 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         return result;
     }
     
+    @Override
+    public void startDisabledJob(final String jobId) {
+        super.startDisabledJob(jobId);
+        Optional<String> optional = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(jobId);
+        optional.ifPresent(s -> ConsistencyCheckJobAPIFactory.getInstance().startDisabledJob(s));
+    }
+    
     @Override
     public JobType getJobType() {
         return JobType.MIGRATION;
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java
index 743ed014c9e..7d7b448921f 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-env/src/test/java/org/apache/shardingsphere/test/integration/env/container/atomic/storage/impl/OpenGaussContainer.java
@@ -47,8 +47,8 @@ public final class OpenGaussContainer extends DockerStorageContainer {
         addEnvs(storageContainerConfiguration.getContainerEnvironments());
         mapResources(storageContainerConfiguration.getMountedResources());
         withPrivilegedMode(true);
-        withStartupTimeout(Duration.of(120, ChronoUnit.SECONDS));
         super.configure();
+        withStartupTimeout(Duration.of(120, ChronoUnit.SECONDS));
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index dae115c71c3..96c6ef808ea 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -282,7 +282,8 @@ public abstract class BaseITCase {
             for (Map<String, Object> each : listJobStatus) {
                 assertTrue(StringUtils.isBlank(each.get("error_message").toString()));
                 actualStatus.add(each.get("status").toString());
-                incrementalIdleSecondsList.add(Integer.parseInt(each.get("incremental_idle_seconds").toString()));
+                String incrementalIdleSeconds = each.get("incremental_idle_seconds").toString();
+                incrementalIdleSecondsList.add(StringUtils.isBlank(incrementalIdleSeconds) ? 0 : Integer.parseInt(incrementalIdleSeconds));
             }
             assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
                     JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
index 65907b3c29a..5545cc94dfe 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -17,39 +17,69 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public final class ConsistencyCheckJobAPIImplTest {
     
-    private static ConsistencyCheckJobAPI jobAPI;
+    private static ConsistencyCheckJobAPI checkJobAPI;
+    
+    private static MigrationJobAPI migrationJobAPI;
     
     @BeforeClass
     public static void beforeClass() {
         PipelineContextUtil.mockModeConfigAndContextManager();
-        jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+        checkJobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+        migrationJobAPI = MigrationJobAPIFactory.getInstance();
     }
     
     @Test
     public void assertCreateJobConfig() {
         String migrationJobId = "j0101test";
         CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(migrationJobId, null, null);
-        String checkJobId = jobAPI.createJobAndStart(parameter);
-        ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) jobAPI.getJobConfiguration(checkJobId);
+        String checkJobId = checkJobAPI.createJobAndStart(parameter);
+        ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
         String expectCheckJobId = "j0201j0101test0";
         assertThat(jobConfig.getJobId(), is(expectCheckJobId));
         assertNull(jobConfig.getAlgorithmTypeName());
         int consistencyCheckVersion = ConsistencyCheckJobId.getSequence(expectCheckJobId);
         assertThat(consistencyCheckVersion, is(0));
     }
+    
+    @Test
+    public void assertGetLatestDataConsistencyCheckResult() {
+        Optional<String> jobId = migrationJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        assertTrue(jobId.isPresent());
+        CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(jobId.get(), null, null);
+        String checkJobId = checkJobAPI.createJobAndStart(parameter);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(jobId.get(), checkJobId);
+        Map<String, DataConsistencyCheckResult> expectResult = Collections.singletonMap("t_order", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1),
+                new DataConsistencyContentCheckResult(true)));
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(), checkJobId, expectResult);
+        Map<String, DataConsistencyCheckResult> actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(jobId.get());
+        assertThat(actualCheckResult.size(), is(expectResult.size()));
+        assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectResult.get("t_order").getContentCheckResult().isMatched()));
+    }
 }