You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/11 07:57:45 UTC

[shardingsphere] branch master updated: Revise show migration check status DistSQL impl; Clean unused dataConsistencyCheck methods (#21488)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 486a1565a31 Revise show migration check status DistSQL impl; Clean unused dataConsistencyCheck methods (#21488)
486a1565a31 is described below

commit 486a1565a31ce11ca09e0360621dbfb81d7c9d18
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Oct 11 15:57:36 2022 +0800

    Revise show migration check status DistSQL impl; Clean unused dataConsistencyCheck methods (#21488)
    
    * Revise show migration check status DistSQL impl
    
    * Simplify aggregateDataConsistencyCheckResults impl
    
    * Clean unused dataConsistencyCheck methods; Improve dataConsistencyCheck unit test
    
    * Remove unused transient
    
    * Recover wait time in IT
    
    * Fix getJobProgressInfo
    
    * Convert "NULL" to "" in check status
    
    * Fix setRecordsCount too late
    
    * Rename to emptyIfNull
---
 .../ShowMigrationCheckStatusQueryResultSet.java    | 13 +++++--
 .../api/InventoryIncrementalJobPublicAPI.java      | 20 ----------
 .../consistency/DataConsistencyCheckResult.java    |  9 +++++
 .../api/pojo/ConsistencyCheckJobProgressInfo.java  |  2 +-
 .../core/api/InventoryIncrementalJobAPI.java       |  2 +-
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 22 +----------
 ...SingleTableInventoryDataConsistencyChecker.java | 25 +++++++------
 .../ConsistencyCheckJobAPIImpl.java                | 43 ++++++++++++----------
 .../migration/MigrationDataConsistencyChecker.java |  7 +---
 .../yaml/metadata/YamlPipelineColumnMetaData.java  |  4 ++
 .../data/pipeline/cases/base/BaseITCase.java       |  2 +-
 .../cases/migration/AbstractMigrationITCase.java   |  2 +-
 .../core/api/impl/MigrationJobAPIImplTest.java     | 24 ++++--------
 .../core/util/JobConfigurationBuilder.java         |  6 ++-
 14 files changed, 77 insertions(+), 104 deletions(-)

diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index e893f660de8..5c4a3b0247b 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -47,15 +47,20 @@ public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDis
         ConsistencyCheckJobProgressInfo progressInfo = JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
         List<Collection<Object>> result = new LinkedList<>();
         String checkResult = null == progressInfo.getResult() ? "" : progressInfo.getResult().toString();
-        result.add(Arrays.asList(progressInfo.getTableName(), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
-                ObjectUtils.defaultIfNull(progressInfo.getRemainingSeconds(), ""), progressInfo.getCheckBeginTime(), ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
-                ObjectUtils.defaultIfNull(progressInfo.getDurationSeconds(), ""), progressInfo.getErrorMessage()));
+        result.add(Arrays.asList(emptyIfNull(progressInfo.getTableNames()), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
+                emptyIfNull(progressInfo.getRemainingSeconds()),
+                emptyIfNull(progressInfo.getCheckBeginTime()), emptyIfNull(progressInfo.getCheckEndTime()),
+                emptyIfNull(progressInfo.getDurationSeconds()), emptyIfNull(progressInfo.getErrorMessage())));
         data = result.iterator();
     }
     
+    private Object emptyIfNull(final Object object) {
+        return ObjectUtils.defaultIfNull(object, "");
+    }
+    
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("table_name", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
+        return Arrays.asList("tables", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
     }
     
     @Override
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 47b0051238c..20d50211524 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -27,7 +26,6 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Properties;
 
 /**
  * Inventory incremental job public API.
@@ -92,22 +90,4 @@ public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI,
      * @return data consistency check algorithms
      */
     Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms();
-    
-    /**
-     * Do data consistency check.
-     *
-     * @param jobId job id
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-    
-    /**
-     * Do data consistency check.
-     *
-     * @param jobId job id
-     * @param algorithmType algorithm type
-     * @param algorithmProps algorithm props. Nullable
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps);
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 00bf9120381..71b696fcded 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -32,4 +32,13 @@ public final class DataConsistencyCheckResult {
     private final DataConsistencyCountCheckResult countCheckResult;
     
     private final DataConsistencyContentCheckResult contentCheckResult;
+    
+    /**
+     * Is count and content matched.
+     *
+     * @return matched or not
+     */
+    public boolean isMatched() {
+        return countCheckResult.isMatched() && contentCheckResult.isMatched();
+    }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
index 59604a4e991..e59755a9ccc 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
@@ -26,7 +26,7 @@ import lombok.Data;
 @Data
 public final class ConsistencyCheckJobProgressInfo {
     
-    private String tableName;
+    private String tableNames;
     
     private Boolean result;
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 668724a5d56..79ad6b18c56 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -57,7 +57,7 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
      *
      * @param pipelineJobConfig job configuration
      * @param calculateAlgorithm calculate algorithm
-     * @param checkJobItemContext consistency check job progress listener
+     * @param checkJobItemContext consistency check job item context
      * @return each logic table check result
      */
     Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm,
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 19be5acdd2c..55e0e245383 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -155,23 +155,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
                 : DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps);
     }
     
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
-        checkModeConfig();
-        log.info("Data consistency check for job {}", jobId);
-        PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        DataConsistencyCalculateAlgorithm calculateAlgorithm = buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
-        return dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType, final Properties algorithmProps) {
-        checkModeConfig();
-        log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
-        PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps), null);
-    }
-    
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm,
                                                                         final ConsistencyCheckJobItemContext checkJobItemContext) {
@@ -193,10 +176,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         }
         for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
             DataConsistencyCheckResult checkResult = entry.getValue();
-            boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
-            boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
-            if (!isCountMatched || !isContentMatched) {
-                log.error("job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
+            if (!checkResult.isMatched()) {
                 return false;
             }
         }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 3547a2a1f48..a9eaa29f2a2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -100,12 +100,20 @@ public final class SingleTableInventoryDataConsistencyChecker {
     }
     
     private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor,
-                                             final ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
+                                             final ConsistencyCheckJobItemContext checkJobItemContext) {
         String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
         String targetDatabaseType = targetDataSource.getDatabaseType().getType();
         String sourceTableName = sourceTable.getTableName().getOriginal();
         String schemaName = sourceTable.getSchemaName().getOriginal();
         PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, sourceTableName);
+        if (null != checkJobItemContext) {
+            checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
+            InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+            Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
+            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+            checkJobItemContext.setRecordsCount(recordsCount);
+            log.info("check, get records count: {}", recordsCount);
+        }
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
         Collection<String> columnNames = tableMetaData.getColumnNames();
         DataConsistencyCalculateParameter sourceParameter = buildParameter(
@@ -114,13 +122,6 @@ public final class SingleTableInventoryDataConsistencyChecker {
                 targetDataSource, targetTable.getSchemaName().getOriginal(), targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
         Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
         Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
-        if (null != consistencyCheckJobItemContext) {
-            consistencyCheckJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
-            InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
-            Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
-            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
-            consistencyCheckJobItemContext.setRecordsCount(recordsCount);
-        }
         long sourceRecordsCount = 0;
         long targetRecordsCount = 0;
         boolean contentMatched = true;
@@ -139,12 +140,12 @@ public final class SingleTableInventoryDataConsistencyChecker {
                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
                 break;
             }
-            if (null != consistencyCheckJobItemContext) {
-                consistencyCheckJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+            if (null != checkJobItemContext) {
+                checkJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
             }
         }
-        if (null != consistencyCheckJobItemContext) {
-            consistencyCheckJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
+        if (null != checkJobItemContext) {
+            checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
         }
         return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index c46efa359f1..60f9e0da017 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -36,11 +36,13 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgr
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
@@ -49,13 +51,12 @@ import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
-import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.Duration;
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Collections;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -64,6 +65,8 @@ import java.util.Optional;
 @Slf4j
 public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl implements ConsistencyCheckJobAPI {
     
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+    
     private final YamlConsistencyCheckJobProgressSwapper swapper = new YamlConsistencyCheckJobProgressSwapper();
     
     @Override
@@ -175,33 +178,35 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
         if (null == jobItemProgress) {
             return result;
         }
-        int finishedPercentage;
         LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
-        if (null != jobItemProgress.getRecordsCount() && Objects.equals(jobItemProgress.getCheckedRecordsCount(), jobItemProgress.getRecordsCount())) {
-            finishedPercentage = 100;
+        if (null == jobItemProgress.getRecordsCount()) {
+            result.setFinishedPercentage(0);
+            result.setResult(false);
+            return result;
+        }
+        long recordsCount = jobItemProgress.getRecordsCount();
+        if (JobStatus.FINISHED == jobItemProgress.getStatus()) {
+            result.setFinishedPercentage(100);
             LocalDateTime checkEndTime = new Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
             Duration duration = Duration.between(checkBeginTime, checkEndTime);
-            result.setDurationSeconds(duration.toMillis() / 1000);
+            result.setDurationSeconds(duration.getSeconds());
             result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
             result.setRemainingSeconds(0L);
         } else {
-            if (null == jobItemProgress.getRecordsCount()) {
-                finishedPercentage = 0;
-            } else {
-                finishedPercentage = Math.min(100, BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() * 100, jobItemProgress.getRecordsCount())).intValue());
-                Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
-                long remainMills = jobItemProgress.getRecordsCount() * 100 / jobItemProgress.getCheckedRecordsCount() * duration.toMillis();
-                result.setRemainingSeconds(remainMills / 1000);
-            }
+            long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
+            result.setFinishedPercentage((int) (checkedRecordsCount * 100 / recordsCount));
+            Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
+            result.setDurationSeconds(duration.getSeconds());
+            long remainingMills = (recordsCount - checkedRecordsCount) / recordsCount * duration.toMillis();
+            result.setRemainingSeconds(remainingMills / 1000);
         }
-        result.setFinishedPercentage(finishedPercentage);
-        String tableName = null == jobItemProgress.getTableNames() ? null : jobItemProgress.getTableNames().split(",")[0];
-        result.setTableName(Optional.ofNullable(tableName).orElse(""));
+        String tableNames = jobItemProgress.getTableNames();
+        result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
         result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
         result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
         Map<String, DataConsistencyCheckResult> checkJobResult = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId, checkJobId);
-        Optional<DataConsistencyCheckResult> dataConsistencyCheckResult = Optional.ofNullable(checkJobResult.get(tableName));
-        dataConsistencyCheckResult.ifPresent(optional -> result.setResult(optional.getContentCheckResult().isMatched()));
+        InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(parentJobId));
+        result.setResult(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult));
         return result;
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 91fa69b6fdc..f6b97a2a455 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
@@ -75,12 +74,10 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
         verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
         SchemaTableName sourceTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())), new TableName(jobConfig.getSourceTableName()));
         SchemaTableName targetTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())), new TableName(jobConfig.getTargetTableName()));
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
         Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
         try (
-                PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
-                PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
+                PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
+                PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
             PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
             SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
                     sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
index e6f88962b17..c4a96dc64e4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
@@ -17,12 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.yaml.metadata;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
  * Yaml pipeline column meta data.
  */
+@NoArgsConstructor
+@AllArgsConstructor
 @Data
 public final class YamlPipelineColumnMetaData implements YamlConfiguration {
     
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index f269624890e..328bbfc0b1d 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -286,7 +286,7 @@ public abstract class BaseITCase {
             }
             assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
                     JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
-            if (Collections.min(incrementalIdleSecondsList) < 15) {
+            if (Collections.min(incrementalIdleSecondsList) <= 5) {
                 ThreadUtil.sleep(3, TimeUnit.SECONDS);
                 continue;
             }
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index a04d7224de4..ac23115bf9b 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -172,7 +172,7 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
             if (checkEndTimeList.size() == resultList.size()) {
                 break;
             }
-            ThreadUtil.sleep(5, TimeUnit.SECONDS);
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
         }
         log.info("check job results: {}", resultList);
         for (Map<String, Object> entry : resultList) {
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 9cd96221ed8..452001a8b4a 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -27,17 +27,16 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.junit.AfterClass;
@@ -149,22 +148,13 @@ public final class MigrationJobAPIImplTest {
     }
     
     @Test
-    public void assertDataConsistencyCheck() throws NoSuchFieldException, IllegalAccessException {
-        MigrationJobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
-        ReflectionUtil.setFieldValue(jobConfiguration, "uniqueKeyColumn", new PipelineColumnMetaData(1, "order_id", 4, "", false, true, true));
-        Optional<String> jobId = jobAPI.start(jobConfiguration);
-        assertTrue(jobId.isPresent());
-        initTableData(jobAPI.getJobConfiguration(jobId.get()));
-        Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobId.get());
-        assertThat(checkResultMap.size(), is(1));
-    }
-    
-    @Test
-    public void assertDataConsistencyCheckWithAlgorithm() {
-        Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+    public void assertDataConsistencyCheck() {
+        MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+        initTableData(jobConfig);
+        Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
-        initTableData(jobAPI.getJobConfiguration(jobId.get()));
-        Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
+        DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
+        Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
         assertThat(checkResultMap.size(), is(1));
         assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
         assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(2L));
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 31a0d5ea6a4..72f4bb695d3 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -21,8 +21,6 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -31,6 +29,9 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaData;
 
 /**
  * Job configuration builder.
@@ -53,6 +54,7 @@ public final class JobConfigurationBuilder {
         result.setSource(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("migration_standard_jdbc_source.yaml"))));
         result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
                 ConfigurationFileUtil.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
+        result.setUniqueKeyColumn(new YamlPipelineColumnMetaData(1, "order_id", 4, "", false, true, true));
         PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
         return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
     }