You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/11 03:24:57 UTC

[shardingsphere] branch master updated: Refactor show migration check status DistSQL implementation (#21441)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b847a7a2c Refactor show migration check status DistSQL implementation (#21441)
c4b847a7a2c is described below

commit c4b847a7a2c737a1885ead254a5c9ae229f64136
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Oct 11 11:24:45 2022 +0800

    Refactor show migration check status DistSQL implementation (#21441)
    
    * Refactor show migration check status DistSQL implementation
    
    * Fix ci
    
    * Fix conflict
    
    * Fix ci
    
    * Fix codestyle
    
    * Fix name
---
 .../ShowMigrationCheckStatusQueryResultSet.java    | 24 ++++-----
 .../ShowMigrationCheckStatusStatement.java         |  2 +-
 .../pipeline/api/ConsistencyCheckJobPublicAPI.java |  9 ++++
 .../job/progress/ConsistencyCheckJobProgress.java  | 10 ++++
 .../PipelineJobProgressUpdatedParameter.java       |  4 +-
 .../api/pojo/ConsistencyCheckJobProgressInfo.java} | 24 +++++++--
 .../pipeline/core/api/GovernanceRepositoryAPI.java |  4 +-
 .../core/api/InventoryIncrementalJobAPI.java       |  5 +-
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 14 +++--
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |  4 +-
 ...SingleTableInventoryDataConsistencyChecker.java | 33 ++++++++++--
 .../pipeline/core/importer/DefaultImporter.java    |  2 +-
 .../yaml/YamlConsistencyCheckJobProgress.java      | 10 ++++
 .../YamlConsistencyCheckJobProgressSwapper.java    | 10 ++++
 .../consistencycheck/ConsistencyCheckJob.java      |  3 ++
 .../ConsistencyCheckJobAPIImpl.java                | 61 +++++++++++++++++++---
 .../ConsistencyCheckJobItemContext.java            | 25 ++++++++-
 .../ConsistencyCheckTasksRunner.java               |  3 +-
 .../migration/MigrationDataConsistencyChecker.java | 11 ++--
 .../scenario/migration/MigrationJobAPIImpl.java    |  6 ++-
 .../migration/MigrationJobItemContext.java         |  3 +-
 .../data/pipeline/cases/base/BaseITCase.java       |  3 +-
 .../cases/migration/AbstractMigrationITCase.java   | 17 +++---
 .../core/api/impl/MigrationJobAPIImplTest.java     |  2 +-
 .../MigrationDataConsistencyCheckerTest.java       | 21 +++++++-
 25 files changed, 245 insertions(+), 65 deletions(-)

diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index 5c90cfbb7b1..e893f660de8 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -17,22 +17,20 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * Show migration check status query result set.
@@ -46,20 +44,18 @@ public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDis
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
         ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement;
-        Map<String, DataConsistencyCheckResult> consistencyCheckResult = JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
-        List<Collection<Object>> result = new ArrayList<>(consistencyCheckResult.size());
-        for (Entry<String, DataConsistencyCheckResult> entry : consistencyCheckResult.entrySet()) {
-            DataConsistencyCheckResult value = entry.getValue();
-            DataConsistencyCountCheckResult countCheckResult = value.getCountCheckResult();
-            result.add(Arrays.asList(entry.getKey(), countCheckResult.getSourceRecordsCount(), countCheckResult.getTargetRecordsCount(), String.valueOf(countCheckResult.isMatched()),
-                    String.valueOf(value.getContentCheckResult().isMatched())));
-        }
+        ConsistencyCheckJobProgressInfo progressInfo = JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
+        List<Collection<Object>> result = new LinkedList<>();
+        String checkResult = null == progressInfo.getResult() ? "" : progressInfo.getResult().toString();
+        result.add(Arrays.asList(progressInfo.getTableName(), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
+                ObjectUtils.defaultIfNull(progressInfo.getRemainingSeconds(), ""), progressInfo.getCheckBeginTime(), ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
+                ObjectUtils.defaultIfNull(progressInfo.getDurationSeconds(), ""), progressInfo.getErrorMessage()));
         data = result.iterator();
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("table_name", "source_records_count", "target_records_count", "records_count_matched", "records_content_matched");
+        return Arrays.asList("table_name", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
     }
     
     @Override
diff --git a/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java b/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
index fe8a36c0e9d..e9b2e6932d5 100644
--- a/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
+++ b/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
@@ -22,7 +22,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
 
 /**
- * Show check migration status statement.
+ * Show migration check status statement.
  */
 @RequiredArgsConstructor
 @Getter
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 008f0002705..2e59d9e3976 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -59,4 +60,12 @@ public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, Requ
      * @param parentJobId parent job id
      */
     void stopByParentJobId(String parentJobId);
+    
+    /**
+     * Get consistency job progress info.
+     *
+     * @param parentJobId parent job id
+     * @return consistency job progress info
+     */
+    ConsistencyCheckJobProgressInfo getJobProgressInfo(String parentJobId);
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index 22a16efa840..e3297179140 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -31,4 +31,14 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
     
     private JobStatus status = JobStatus.RUNNING;
+    
+    private String tableNames;
+    
+    private Long checkedRecordsCount;
+    
+    private Long recordsCount;
+    
+    private Long checkBeginTimeMillis;
+    
+    private Long checkEndTimeMillis;
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
index 1a21a14bb7e..ab195bc42e4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
@@ -27,7 +27,5 @@ import lombok.RequiredArgsConstructor;
 @Getter
 public final class PipelineJobProgressUpdatedParameter {
     
-    private final int insertedRecordsCount;
-    
-    private final int deletedRecordsCount;
+    private final int processedRecordsCount;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
similarity index 65%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
index f0baaf647c8..59604a4e991 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
@@ -15,16 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
 
 import lombok.Data;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
- * Yaml data consistency check job progress.
+ * Consistency check jon progress info.
  */
+
 @Data
-public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
+public final class ConsistencyCheckJobProgressInfo {
+    
+    private String tableName;
+    
+    private Boolean result;
+    
+    private int finishedPercentage;
+    
+    private Long remainingSeconds;
+    
+    private String checkBeginTime;
+    
+    private String checkEndTime;
+    
+    private Long durationSeconds;
     
-    private String status;
+    private String errorMessage;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index e7dc53f1050..17874b2e8bb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -76,11 +76,11 @@ public interface GovernanceRepositoryAPI {
     /**
      * Get check job result.
      *
-     * @param jobId job id
+     * @param parentJobId job id
      * @param checkJobId check job id
      * @return check job result
      */
-    Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId, String checkJobId);
+    Map<String, DataConsistencyCheckResult> getCheckJobResult(String parentJobId, String checkJobId);
     
     /**
      * Persist check job result.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 077d08c7281..668724a5d56 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 
 import java.util.Map;
@@ -56,9 +57,11 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
      *
      * @param pipelineJobConfig job configuration
      * @param calculateAlgorithm calculate algorithm
+     * @param checkJobItemContext consistency check job progress listener
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm,
+                                                                 ConsistencyCheckJobItemContext checkJobItemContext);
     
     /**
      * Aggregate data consistency check results.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 2881877cbe6..19be5acdd2c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProce
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
 import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
@@ -160,7 +161,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         log.info("Data consistency check for job {}", jobId);
         PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
         DataConsistencyCalculateAlgorithm calculateAlgorithm = buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
-        return dataConsistencyCheck(jobConfig, calculateAlgorithm);
+        return dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
     }
     
     @Override
@@ -168,18 +169,21 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         checkModeConfig();
         log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
         PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps));
+        return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps), null);
     }
     
     @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm,
+                                                                        final ConsistencyCheckJobItemContext checkJobItemContext) {
         String jobId = jobConfig.getJobId();
-        Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
+        PipelineDataConsistencyChecker dataConsistencyChecker = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig), checkJobItemContext);
+        Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.check(calculateAlgorithm);
         log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
         return result;
     }
     
-    protected abstract PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext);
+    protected abstract PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext,
+                                                                                          ConsistencyCheckJobItemContext checkJobItemContext);
     
     @Override
     public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 86ce2c90139..64a876cc8f9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -77,9 +77,9 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     
     @SuppressWarnings("unchecked")
     @Override
-    public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String jobId, final String checkJobId) {
+    public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String parentJobId, final String checkJobId) {
         Map<String, DataConsistencyCheckResult> result = new HashMap<>();
-        String yamlCheckResultMapText = repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+        String yamlCheckResultMapText = repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, checkJobId));
         if (StringUtils.isBlank(yamlCheckResultMapText)) {
             return Collections.emptyMap();
         }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 3125fc778fc..3547a2a1f48 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -26,12 +28,16 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -40,7 +46,9 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.
 
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -72,6 +80,8 @@ public final class SingleTableInventoryDataConsistencyChecker {
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
+    private final ConsistencyCheckJobItemContext consistencyCheckJobItemContext;
+    
     /**
      * Data consistency check.
      *
@@ -82,26 +92,35 @@ public final class SingleTableInventoryDataConsistencyChecker {
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + "-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
-            return check(calculateAlgorithm, executor);
+            return check(calculateAlgorithm, executor, consistencyCheckJobItemContext);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
         }
     }
     
-    private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor) {
+    private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor,
+                                             final ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
         String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
         String targetDatabaseType = targetDataSource.getDatabaseType().getType();
         String sourceTableName = sourceTable.getTableName().getOriginal();
-        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(), sourceTableName);
+        String schemaName = sourceTable.getSchemaName().getOriginal();
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, sourceTableName);
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
         Collection<String> columnNames = tableMetaData.getColumnNames();
         DataConsistencyCalculateParameter sourceParameter = buildParameter(
-                sourceDataSource, sourceTable.getSchemaName().getOriginal(), sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
+                sourceDataSource, schemaName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
         DataConsistencyCalculateParameter targetParameter = buildParameter(
                 targetDataSource, targetTable.getSchemaName().getOriginal(), targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
         Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
         Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
+        if (null != consistencyCheckJobItemContext) {
+            consistencyCheckJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
+            InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+            Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
+            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+            consistencyCheckJobItemContext.setRecordsCount(recordsCount);
+        }
         long sourceRecordsCount = 0;
         long targetRecordsCount = 0;
         boolean contentMatched = true;
@@ -120,6 +139,12 @@ public final class SingleTableInventoryDataConsistencyChecker {
                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
                 break;
             }
+            if (null != consistencyCheckJobItemContext) {
+                consistencyCheckJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+            }
+        }
+        if (null != consistencyCheckJobItemContext) {
+            consistencyCheckJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
         }
         return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 4b87e973cf9..699fc6d1878 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -125,7 +125,7 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
             flushInternal(dataSource, each.getInsertDataRecords());
             flushInternal(dataSource, each.getUpdateDataRecords());
         }
-        return new PipelineJobProgressUpdatedParameter(insertRecordNumber, deleteRecordNumber);
+        return new PipelineJobProgressUpdatedParameter(insertRecordNumber - deleteRecordNumber);
     }
     
     private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index f0baaf647c8..d915fce6a1a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -27,4 +27,14 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
     
     private String status;
+    
+    private String tableNames;
+    
+    private Long checkedRecordsCount;
+    
+    private Long recordsCount;
+    
+    private Long checkBeginTimeMillis;
+    
+    private Long checkEndTimeMillis;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
index 8796af3c6d8..a3f5dcf20b1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
@@ -30,6 +30,11 @@ public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigu
     public YamlConsistencyCheckJobProgress swapToYamlConfiguration(final ConsistencyCheckJobProgress data) {
         YamlConsistencyCheckJobProgress result = new YamlConsistencyCheckJobProgress();
         result.setStatus(data.getStatus().name());
+        result.setRecordsCount(data.getRecordsCount());
+        result.setCheckedRecordsCount(data.getCheckedRecordsCount());
+        result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
+        result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
+        result.setTableNames(data.getTableNames());
         return result;
     }
     
@@ -37,6 +42,11 @@ public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigu
     public ConsistencyCheckJobProgress swapToObject(final YamlConsistencyCheckJobProgress yamlConfig) {
         ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
         result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
+        result.setRecordsCount(yamlConfig.getRecordsCount());
+        result.setCheckedRecordsCount(yamlConfig.getCheckedRecordsCount());
+        result.setCheckBeginTimeMillis(yamlConfig.getCheckBeginTimeMillis());
+        result.setCheckEndTimeMillis(yamlConfig.getCheckEndTimeMillis());
+        result.setTableNames(yamlConfig.getTableNames());
         return result;
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index b86955e252d..cce3ea67e4c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
@@ -55,6 +56,7 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
         }
         ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
         tasksRunner.start();
+        PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, shardingContext.getShardingItem());
         getTasksRunnerMap().put(shardingItem, tasksRunner);
     }
     
@@ -74,5 +76,6 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
         getTasksRunnerMap().clear();
         String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
         pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
+        PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index b91daee9f9e..c46efa359f1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -23,8 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
@@ -34,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -44,12 +43,19 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
 import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDateTime;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -103,8 +109,14 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     @Override
     public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+        ConsistencyCheckJobItemContext checkJobItemContext = (ConsistencyCheckJobItemContext) jobItemContext;
         ConsistencyCheckJobProgress jobProgress = new ConsistencyCheckJobProgress();
         jobProgress.setStatus(jobItemContext.getStatus());
+        jobProgress.setCheckedRecordsCount(checkJobItemContext.getCheckedRecordsCount().get());
+        jobProgress.setRecordsCount(checkJobItemContext.getRecordsCount());
+        jobProgress.setCheckBeginTimeMillis(checkJobItemContext.getCheckBeginTimeMillis());
+        jobProgress.setCheckEndTimeMillis(checkJobItemContext.getCheckEndTimeMillis());
+        jobProgress.setTableNames(null == checkJobItemContext.getTableNames() ? null : String.join(",", checkJobItemContext.getTableNames()));
         YamlConsistencyCheckJobProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobProgress);
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
     }
@@ -115,10 +127,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
         if (StringUtils.isBlank(progress)) {
             return null;
         }
-        ConsistencyCheckJobProgress jobProgress = swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
-        ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
-        result.setStatus(jobProgress.getStatus());
-        return result;
+        return swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
     }
     
     @Override
@@ -156,6 +165,46 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
         stop(checkLatestJobId.get());
     }
     
+    @Override
+    public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String parentJobId) {
+        Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+        ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+        String checkJobId = checkLatestJobId.get();
+        ConsistencyCheckJobProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+        ConsistencyCheckJobProgressInfo result = new ConsistencyCheckJobProgressInfo();
+        if (null == jobItemProgress) {
+            return result;
+        }
+        int finishedPercentage;
+        LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
+        if (null != jobItemProgress.getRecordsCount() && Objects.equals(jobItemProgress.getCheckedRecordsCount(), jobItemProgress.getRecordsCount())) {
+            finishedPercentage = 100;
+            LocalDateTime checkEndTime = new Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
+            Duration duration = Duration.between(checkBeginTime, checkEndTime);
+            result.setDurationSeconds(duration.toMillis() / 1000);
+            result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
+            result.setRemainingSeconds(0L);
+        } else {
+            if (null == jobItemProgress.getRecordsCount()) {
+                finishedPercentage = 0;
+            } else {
+                finishedPercentage = Math.min(100, BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() * 100, jobItemProgress.getRecordsCount())).intValue());
+                Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
+                long remainMills = jobItemProgress.getRecordsCount() * 100 / jobItemProgress.getCheckedRecordsCount() * duration.toMillis();
+                result.setRemainingSeconds(remainMills / 1000);
+            }
+        }
+        result.setFinishedPercentage(finishedPercentage);
+        String tableName = null == jobItemProgress.getTableNames() ? null : jobItemProgress.getTableNames().split(",")[0];
+        result.setTableName(Optional.ofNullable(tableName).orElse(""));
+        result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
+        result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
+        Map<String, DataConsistencyCheckResult> checkJobResult = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId, checkJobId);
+        Optional<DataConsistencyCheckResult> dataConsistencyCheckResult = Optional.ofNullable(checkJobResult.get(tableName));
+        dataConsistencyCheckResult.ifPresent(optional -> result.setResult(optional.getContentCheckResult().isMatched()));
+        return result;
+    }
+    
     @Override
     public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
         return getJobConfiguration(getElasticJobConfigPOJO(jobId));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index fb5841d9fde..7b812529dd6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -24,6 +24,12 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJo
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Consistency check job item context.
@@ -31,7 +37,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 @Getter
 @Setter
 @Slf4j
-public final class ConsistencyCheckJobItemContext implements PipelineJobItemContext {
+public final class ConsistencyCheckJobItemContext implements PipelineJobItemContext, PipelineJobProgressListener {
     
     private final String jobId;
     
@@ -43,6 +49,16 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
     
     private volatile JobStatus status;
     
+    private Collection<String> tableNames;
+    
+    private volatile Long recordsCount;
+    
+    private final AtomicLong checkedRecordsCount = new AtomicLong(0);
+    
+    private final long checkBeginTimeMillis;
+    
+    private Long checkEndTimeMillis;
+    
     private final ConsistencyCheckJobConfiguration jobConfig;
     
     public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final JobStatus status) {
@@ -50,10 +66,17 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
         this.status = status;
+        checkBeginTimeMillis = System.currentTimeMillis();
     }
     
     @Override
     public PipelineProcessContext getJobProcessContext() {
         throw new UnsupportedOperationException();
     }
+    
+    @Override
+    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
+        checkedRecordsCount.addAndGet(parameter.getProcessedRecordsCount());
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index 69055b5a85d..a13093312a4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -99,8 +99,9 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
             DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
                     parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
             this.calculateAlgorithm = calculateAlgorithm;
-            Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+            Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, jobItemContext);
             PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+            jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
         }
         
         @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 50ab3a39afb..91fa69b6fdc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -57,11 +58,15 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
+    private final ConsistencyCheckJobItemContext checkJobItemContext;
+    
+    public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext,
+                                           final ConsistencyCheckJobItemContext checkJobItemContext) {
         this.jobConfig = jobConfig;
         readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
         tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
                 TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
+        this.checkJobItemContext = checkJobItemContext;
     }
     
     @Override
@@ -77,8 +82,8 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
                 PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
             PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
-            SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
-                    jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm);
+            SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
+                    sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
             result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index d73e6c07c47..72fe7f49559 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -63,6 +63,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -241,8 +242,9 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     }
     
     @Override
-    protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext);
+    protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+                                                                                 final ConsistencyCheckJobItemContext checkJobItemContext) {
+        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext, checkJobItemContext);
     }
     
     @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index da3eb7b8203..35651826bd2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -132,8 +132,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     
     @Override
     public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
-        int needAddNumber = parameter.getInsertedRecordsCount() - parameter.getDeletedRecordsCount();
-        processedRecordsCount.addAndGet(needAddNumber);
+        processedRecordsCount.addAndGet(parameter.getProcessedRecordsCount());
         PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
     }
     
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 644917216ba..f269624890e 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -22,6 +22,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -238,7 +239,7 @@ public abstract class BaseITCase {
                 ResultSet resultSet = connection.createStatement().executeQuery(sql);
                 List<Map<String, Object>> result = resultSetToList(resultSet);
                 log.info("proxy query for list, sql: {}, result: {}", sql, result);
-                return result;
+                return ObjectUtils.defaultIfNull(result, Collections.emptyList());
             } catch (final SQLException ex) {
                 log.error("data access error", ex);
             }
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 5032f649681..a04d7224de4 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.integration.data.pipeline.cases.migration;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
 import org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
@@ -164,19 +165,19 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
     
     protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) throws SQLException {
         proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
-        List<Map<String, Object>> checkJobResults = Collections.emptyList();
+        List<Map<String, Object>> resultList = Collections.emptyList();
         for (int i = 0; i < 10; i++) {
-            checkJobResults = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
-            if (null != checkJobResults && !checkJobResults.isEmpty()) {
+            resultList = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
+            List<String> checkEndTimeList = resultList.stream().map(map -> map.get("check_end_time").toString()).filter(StringUtils::isNotBlank).collect(Collectors.toList());
+            if (checkEndTimeList.size() == resultList.size()) {
                 break;
             }
             ThreadUtil.sleep(5, TimeUnit.SECONDS);
         }
-        assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
-        log.info("check job results: {}", checkJobResults);
-        for (Map<String, Object> entry : checkJobResults) {
-            assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
-            assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+        log.info("check job results: {}", resultList);
+        for (Map<String, Object> entry : resultList) {
+            assertTrue(Boolean.parseBoolean(entry.get("result").toString()));
+            assertThat(entry.get("finished_percentage").toString(), is("100"));
         }
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index f8dd4794b7b..9cd96221ed8 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -29,8 +29,8 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index fa646106a05..1b008f63e67 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -18,12 +18,19 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -31,6 +38,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -46,13 +54,22 @@ public final class MigrationDataConsistencyCheckerTest {
     @Test
     public void assertCountAndDataCheck() throws SQLException {
         MigrationJobConfiguration jobConfig = createJobConfiguration();
-        Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null))
-                .check(new DataConsistencyCalculateAlgorithmFixture());
+        JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
+        jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobConfig.getJobId(), 0, "");
+        Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null),
+                createConsistencyCheckJobItemConfig()).check(new DataConsistencyCalculateAlgorithmFixture());
         assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
         assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
         assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
     }
     
+    private ConsistencyCheckJobItemContext createConsistencyCheckJobItemConfig() {
+        ConsistencyCheckJobConfiguration jobConfig = new ConsistencyCheckJobConfiguration("", "", "", new Properties());
+        return new ConsistencyCheckJobItemContext(jobConfig, 0, JobStatus.RUNNING);
+    }
+    
     private MigrationJobConfiguration createJobConfiguration() throws SQLException {
         MigrationJobItemContext jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
         initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());