You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/10 05:53:23 UTC

[shardingsphere] branch master updated: Support cancel data consistency check and refactoring (#21429)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c778a7eab7 Support cancel data consistency check and refactoring (#21429)
4c778a7eab7 is described below

commit 4c778a7eab7e1c867c1803568fc1ac7030774b1b
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Mon Oct 10 13:53:13 2022 +0800

    Support cancel data consistency check and refactoring (#21429)
    
    * Move DataConsistencyCalculateAlgorithmFactory
    
    * Add SingleTableInventoryDataConsistencyChecker
    
    * Refactor DataConsistencyCalculateParameter.tableNameSchemaNameMapping to schemaName
    
    * Add DataConsistencyCalculatedResult, calculate count and content together
    
    * Refactor SingleTableInventoryDataConsistencyChecker by DataConsistencyCalculatedResult
    
    * Remove buildCountSQL from PipelineSQLBuilder
    
    * Unit test
    
    * Revert "Remove buildCountSQL from PipelineSQLBuilder"
    
    This reverts commit ea946fcce577fe2aee72592176657464f4952753.
    
    * Add CRC32_MATCH in MySQLMigrationGeneralIT
    
    * Add log when crc32 not match
    
    * Add cancel() for DataConsistencyCalculateAlgorithm and abstract impl
    
    * Recover PipelineTask start stop methods
    
    * Add ConsistencyCheckTasksRunner and refactor ConsistencyCheckJob
    
    * Refactor AbstractPipelineJob to use common JobBootstrap
    
    * Cancel DataConsistencyCalculateAlgorithm on stopping
    
    * Log
---
 .../DataConsistencyCalculateParameter.java         |   4 +-
 .../DataConsistencyCalculatedResult.java           |  21 +--
 .../DataConsistencyCalculateAlgorithm.java         |  12 +-
 .../DataConsistencyCalculateAlgorithmFactory.java  |   3 +-
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |   1 +
 .../core/api/InventoryIncrementalJobAPI.java       |  15 +-
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  31 ++--
 .../DataConsistencyCalculateAlgorithmChooser.java  |   1 +
 ...SingleTableInventoryDataConsistencyChecker.java | 148 ++++++++++++++++++
 .../AbstractDataConsistencyCalculateAlgorithm.java |  63 ++++++++
 ...StreamingDataConsistencyCalculateAlgorithm.java |  20 +--
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |  76 ++++++++--
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  27 ++--
 .../PipelineJobHasAlreadyFinishedException.java    |   2 +-
 .../pipeline/core/job/AbstractPipelineJob.java     |   4 +-
 .../data/pipeline/core/task/IncrementalTask.java   |  10 +-
 .../data/pipeline/core/task/InventoryTask.java     |  10 +-
 .../data/pipeline/core/task/PipelineTask.java      |  14 ++
 ...tencyCheckChangedJobConfigurationProcessor.java |   2 +-
 .../consistencycheck/ConsistencyCheckJob.java      |  61 +++-----
 .../ConsistencyCheckTasksRunner.java               | 139 +++++++++++++++++
 .../MigrationChangedJobConfigurationProcessor.java |   2 +-
 .../migration/MigrationDataConsistencyChecker.java | 167 +++------------------
 .../pipeline/scenario/migration/MigrationJob.java  |   4 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java |  17 +--
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |   2 +-
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |   2 +-
 .../migration/general/MySQLMigrationGeneralIT.java |   9 +-
 ...taConsistencyCalculateAlgorithmFactoryTest.java |   1 +
 .../DataConsistencyCalculateAlgorithmFixture.java  |   9 +-
 .../FixtureDataConsistencyCalculatedResult.java}   |  19 +--
 31 files changed, 598 insertions(+), 298 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 7463fadae10..154643bdd59 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -22,7 +22,6 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 
@@ -43,8 +42,7 @@ public final class DataConsistencyCalculateParameter {
      */
     private final PipelineDataSourceWrapper dataSource;
     
-    // TODO replace to schemaName
-    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+    private final String schemaName;
     
     private final String logicTableName;
     
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
similarity index 70%
copy from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
index 3d1c2827377..ea34f78d60e 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
-import org.junit.Test;
-
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+/**
+ * Data consistency calculated result.
+ */
+public interface DataConsistencyCalculatedResult {
     
-    @Test
-    public void assertNewInstanceSuccess() {
-        DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new Properties());
-    }
+    /**
+     * Get records count.
+     *
+     * @return records count
+     */
+    int getRecordsCount();
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
index 368b501ee5a..6ee50d96e10 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
@@ -18,9 +18,12 @@
 package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
 import org.apache.shardingsphere.infra.util.spi.aware.SPIMetadataAware;
 
+import java.sql.SQLException;
+
 /**
  * Data consistency calculate algorithm.
  */
@@ -32,5 +35,12 @@ public interface DataConsistencyCalculateAlgorithm extends ShardingSphereAlgorit
      * @param parameter data consistency calculate parameter
      * @return calculated result
      */
-    Iterable<Object> calculate(DataConsistencyCalculateParameter parameter);
+    Iterable<DataConsistencyCalculatedResult> calculate(DataConsistencyCalculateParameter parameter);
+    
+    /**
+     * Cancel calculation.
+     *
+     * @throws SQLException SQL exception if cancel underlying SQL execution failure
+     */
+    void cancel() throws SQLException;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
similarity index 92%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
index a37556b792f..8db297edc3c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
 
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 0a1299318a7..c5663ba699b 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -124,6 +124,7 @@ public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
      * @param tableName table name
      * @return count SQL
      */
+    // TODO keep it for now, it might be used later
     String buildCountSQL(String schemaName, String tableName);
     
     /**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index fdf355a8a4f..077d08c7281 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * Inventory incremental job API.
@@ -39,13 +41,24 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
     @Override
     InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
     
+    /**
+     * Build data consistency calculate algorithm.
+     *
+     * @param jobConfig job configuration
+     * @param algorithmType algorithm type
+     * @param algorithmProps algorithm properties
+     * @return calculate algorithm
+     */
+    DataConsistencyCalculateAlgorithm buildDataConsistencyCalculateAlgorithm(PipelineJobConfiguration jobConfig, String algorithmType, Properties algorithmProps);
+    
     /**
      * Do data consistency check.
      *
      * @param pipelineJobConfig job configuration
+     * @param calculateAlgorithm calculate algorithm
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm);
     
     /**
      * Aggregate data consistency check results.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index acb875a682a..b98eb85cf8f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -23,20 +23,20 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
 import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -150,19 +150,23 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         }).collect(Collectors.toList());
     }
     
+    @Override
+    public DataConsistencyCalculateAlgorithm buildDataConsistencyCalculateAlgorithm(final PipelineJobConfiguration jobConfig, final String algorithmType, final Properties algorithmProps) {
+        ShardingSpherePreconditions.checkState(null != algorithmType || null != jobConfig, () -> new IllegalArgumentException("algorithmType and jobConfig are null"));
+        if (null != algorithmType) {
+            return DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps);
+        }
+        return DataConsistencyCalculateAlgorithmChooser.choose(
+                DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
+    }
+    
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
         checkModeConfig();
         log.info("Data consistency check for job {}", jobId);
         PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig);
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig) {
-        DataConsistencyCalculateAlgorithm algorithm = DataConsistencyCalculateAlgorithmChooser.choose(
-                DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
-        return dataConsistencyCheck(jobConfig, algorithm);
+        DataConsistencyCalculateAlgorithm calculateAlgorithm = buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
+        return dataConsistencyCheck(jobConfig, calculateAlgorithm);
     }
     
     @Override
@@ -170,10 +174,11 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         checkModeConfig();
         log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
         PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
+        return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps));
     }
     
-    protected Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+    @Override
+    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
         String jobId = jobConfig.getJobId();
         Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
         log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
index 5e50032e3eb..499dfa006c5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
new file mode 100644
index 00000000000..3125fc778fc
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Single table inventory data consistency checker.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class SingleTableInventoryDataConsistencyChecker {
+    
+    private final String jobId;
+    
+    private final PipelineDataSourceWrapper sourceDataSource;
+    
+    private final PipelineDataSourceWrapper targetDataSource;
+    
+    private final SchemaTableName sourceTable;
+    
+    private final SchemaTableName targetTable;
+    
+    private final PipelineColumnMetaData uniqueKey;
+    
+    private final PipelineTableMetaDataLoader metaDataLoader;
+    
+    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+    
+    /**
+     * Data consistency check.
+     *
+     * @param calculateAlgorithm calculate algorithm
+     * @return data consistency check result
+     */
+    public DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + "-check-%d");
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+        try {
+            return check(calculateAlgorithm, executor);
+        } finally {
+            executor.shutdown();
+            executor.shutdownNow();
+        }
+    }
+    
+    private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor) {
+        String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
+        String targetDatabaseType = targetDataSource.getDatabaseType().getType();
+        String sourceTableName = sourceTable.getTableName().getOriginal();
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(), sourceTableName);
+        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
+        Collection<String> columnNames = tableMetaData.getColumnNames();
+        DataConsistencyCalculateParameter sourceParameter = buildParameter(
+                sourceDataSource, sourceTable.getSchemaName().getOriginal(), sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
+        DataConsistencyCalculateParameter targetParameter = buildParameter(
+                targetDataSource, targetTable.getSchemaName().getOriginal(), targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
+        Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
+        Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
+        long sourceRecordsCount = 0;
+        long targetRecordsCount = 0;
+        boolean contentMatched = true;
+        while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
+            if (null != readRateLimitAlgorithm) {
+                readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+            }
+            Future<DataConsistencyCalculatedResult> sourceFuture = executor.submit(sourceCalculatedResults::next);
+            Future<DataConsistencyCalculatedResult> targetFuture = executor.submit(targetCalculatedResults::next);
+            DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(sourceFuture);
+            DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(targetFuture);
+            sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
+            targetRecordsCount += targetCalculatedResult.getRecordsCount();
+            contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
+            if (!contentMatched) {
+                log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
+                break;
+            }
+        }
+        return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
+    }
+    
+    // TODO use digest (crc32, murmurhash)
+    private String getJobIdDigest(final String jobId) {
+        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
+    }
+    
+    private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource,
+                                                             final String schemaName, final String tableName, final Collection<String> columnNames,
+                                                             final String sourceDatabaseType, final String targetDatabaseType, final PipelineColumnMetaData uniqueKey) {
+        return new DataConsistencyCalculateParameter(sourceDataSource, schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
+    }
+    
+    private <T> T waitFuture(final Future<T> future) {
+        try {
+            return future.get();
+        } catch (final InterruptedException | ExecutionException ex) {
+            if (ex.getCause() instanceof PipelineSQLException) {
+                throw (PipelineSQLException) ex.getCause();
+            }
+            throw new SQLWrapperException(new SQLException(ex));
+        }
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
new file mode 100644
index 00000000000..7e14ced4342
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+
+/**
+ * Abstract data consistency calculate algorithm.
+ */
+@Slf4j
+public abstract class AbstractDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
+    
+    @Getter(AccessLevel.PROTECTED)
+    private volatile boolean canceling;
+    
+    private volatile Statement currentStatement;
+    
+    protected <T extends Statement> T setCurrentStatement(final T statement) {
+        this.currentStatement = statement;
+        return statement;
+    }
+    
+    @Override
+    public void cancel() throws SQLException {
+        canceling = true;
+        Statement statement = currentStatement;
+        if (null == statement || statement.isClosed()) {
+            log.info("cancel, statement is null or closed");
+            return;
+        }
+        long startTimeMillis = System.currentTimeMillis();
+        try {
+            statement.cancel();
+        } catch (final SQLFeatureNotSupportedException ex) {
+            log.info("cancel is not supported: {}", ex.getMessage());
+        } catch (final SQLException ex) {
+            log.info("cancel failed: {}", ex.getMessage());
+        }
+        log.info("cancel cost {} ms", System.currentTimeMillis() - startTimeMillis);
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index e35afffba1c..c1c18347e88 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -21,7 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 
 import java.util.Iterator;
 import java.util.Optional;
@@ -33,10 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 @RequiredArgsConstructor
 @Getter
 @Slf4j
-public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
+public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
     
     @Override
-    public final Iterable<Object> calculate(final DataConsistencyCalculateParameter parameter) {
+    public final Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
         return new ResultIterable(parameter);
     }
     
@@ -46,30 +46,30 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm impleme
      * @param parameter data consistency calculate parameter
      * @return optional calculated result, empty means there's no more result
      */
-    protected abstract Optional<Object> calculateChunk(DataConsistencyCalculateParameter parameter);
+    protected abstract Optional<DataConsistencyCalculatedResult> calculateChunk(DataConsistencyCalculateParameter parameter);
     
     /**
      * It's not thread-safe, it should be executed in only one thread at the same time.
      */
     @RequiredArgsConstructor
-    final class ResultIterable implements Iterable<Object> {
+    final class ResultIterable implements Iterable<DataConsistencyCalculatedResult> {
         
         private final DataConsistencyCalculateParameter parameter;
         
         @Override
-        public Iterator<Object> iterator() {
+        public Iterator<DataConsistencyCalculatedResult> iterator() {
             return new ResultIterator(parameter);
         }
     }
     
     @RequiredArgsConstructor
-    final class ResultIterator implements Iterator<Object> {
+    final class ResultIterator implements Iterator<DataConsistencyCalculatedResult> {
         
         private final DataConsistencyCalculateParameter parameter;
         
         private final AtomicInteger calculationCount = new AtomicInteger(0);
         
-        private volatile Optional<Object> nextResult;
+        private volatile Optional<DataConsistencyCalculatedResult> nextResult;
         
         @Override
         public boolean hasNext() {
@@ -78,9 +78,9 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm impleme
         }
         
         @Override
-        public Object next() {
+        public DataConsistencyCalculatedResult next() {
             calculateIfNecessary();
-            Optional<Object> nextResult = this.nextResult;
+            Optional<DataConsistencyCalculatedResult> nextResult = this.nextResult;
             parameter.setPreviousCalculatedResult(nextResult.orElse(null));
             this.nextResult = null;
             return nextResult.orElse(null);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index b2eeac94959..230298808c9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -18,11 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -34,6 +37,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -41,11 +45,12 @@ import java.util.stream.Collectors;
 /**
  * CRC32 match data consistency calculate algorithm.
  */
-@Getter
-public final class CRC32MatchDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
+@Slf4j
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
     
     private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
     
+    @Getter
     private Properties props;
     
     @Override
@@ -54,26 +59,29 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
     }
     
     @Override
-    public Iterable<Object> calculate(final DataConsistencyCalculateParameter parameter) {
+    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
-        return Collections.unmodifiableList(parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList()));
+        List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
+        return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
     }
     
-    private long calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+    private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
         String logicTableName = parameter.getLogicTableName();
-        String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+        String schemaName = parameter.getSchemaName();
         Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName, logicTableName, columnName);
         ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
         return calculateCRC32(parameter.getDataSource(), logicTableName, sql.get());
     }
     
-    private long calculateCRC32(final DataSource dataSource, final String logicTableName, final String sql) {
+    private CalculatedItem calculateCRC32(final DataSource dataSource, final String logicTableName, final String sql) {
         try (
                 Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql);
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql));
                 ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
-            return resultSet.getLong(1);
+            long crc32 = resultSet.getLong(1);
+            int recordsCount = resultSet.getInt(2);
+            return new CalculatedItem(crc32, recordsCount);
         } catch (final SQLException ex) {
             throw new PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
         }
@@ -93,4 +101,52 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
     public String getDescription() {
         return "Match CRC32 of records.";
     }
+    
+    @RequiredArgsConstructor
+    @Getter
+    private static final class CalculatedItem {
+        
+        private final long crc32;
+        
+        private final int recordsCount;
+    }
+    
+    @RequiredArgsConstructor
+    @Getter
+    private static final class CalculatedResult implements DataConsistencyCalculatedResult {
+        
+        private final int recordsCount;
+        
+        @NonNull
+        private final Collection<Long> columnsCrc32;
+        
+        @Override
+        public boolean equals(final @NonNull Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (getClass() != o.getClass()) {
+                log.warn("CalculatedResult type not match, o.className={}", o.getClass().getName());
+                return false;
+            }
+            final CalculatedResult that = (CalculatedResult) o;
+            if (recordsCount != that.recordsCount) {
+                log.info("recordsCount not match, recordsCount={}, that.recordsCount={}", recordsCount, that.recordsCount);
+                return false;
+            }
+            if (!columnsCrc32.equals(that.columnsCrc32)) {
+                log.info("columnsCrc32 not match, columnsCrc32={}, that.columnsCrc32={}", columnsCrc32, that.columnsCrc32);
+                return false;
+            } else {
+                return true;
+            }
+        }
+        
+        @Override
+        public int hashCode() {
+            int result = recordsCount;
+            result = 31 * result + columnsCrc32.hashCode();
+            return result;
+        }
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index da0dbf28569..a2df6b70b79 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderFactory;
@@ -87,12 +88,12 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     }
     
     @Override
-    protected Optional<Object> calculateChunk(final DataConsistencyCalculateParameter parameter) {
+    protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
         CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
         String sql = getQuerySQL(parameter);
         try (
                 Connection connection = parameter.getDataSource().getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+                PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
             preparedStatement.setFetchSize(chunkSize);
             if (null == previousCalculatedResult) {
                 preparedStatement.setInt(1, chunkSize);
@@ -105,6 +106,10 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 ColumnValueReader columnValueReader = ColumnValueReaderFactory.getInstance(parameter.getDatabaseType());
                 while (resultSet.next()) {
+                    if (isCanceling()) {
+                        log.info("canceling, schemaName={}, tableName={}", parameter.getSchemaName(), parameter.getLogicTableName());
+                        throw new PipelineTableDataConsistencyCheckLoadingFailedException(parameter.getLogicTableName());
+                    }
                     ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
                     int columnCount = resultSetMetaData.getColumnCount();
                     Collection<Object> record = new LinkedList<>();
@@ -124,11 +129,11 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     private String getQuerySQL(final DataConsistencyCalculateParameter parameter) {
         PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
         String logicTableName = parameter.getLogicTableName();
-        String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+        String schemaName = parameter.getSchemaName();
         String uniqueKey = parameter.getUniqueKey().getName();
-        String cacheKey = parameter.getDatabaseType() + "-" + (DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
-                ? schemaName.toLowerCase() + "." + logicTableName.toLowerCase()
-                : logicTableName.toLowerCase());
+        String cacheKey = parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+                ? schemaName + "." + logicTableName
+                : logicTableName);
         if (null == parameter.getPreviousCalculatedResult()) {
             return firstSQLCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
         }
@@ -152,12 +157,12 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     
     @RequiredArgsConstructor
     @Getter
-    private static final class CalculatedResult {
+    private static final class CalculatedResult implements DataConsistencyCalculatedResult {
         
         @NonNull
         private final Object maxUniqueKeyValue;
         
-        private final int recordCount;
+        private final int recordsCount;
         
         private final Collection<Collection<Object>> records;
         
@@ -172,10 +177,10 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                 return false;
             }
             final CalculatedResult that = (CalculatedResult) o;
-            boolean equalsFirst = new EqualsBuilder().append(getRecordCount(), that.getRecordCount()).append(getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue()).isEquals();
+            boolean equalsFirst = new EqualsBuilder().append(getRecordsCount(), that.getRecordsCount()).append(getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue()).isEquals();
             if (!equalsFirst) {
                 log.warn("recordCount or maxUniqueKeyValue not match, recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}",
-                        getRecordCount(), that.getRecordCount(), getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
+                        getRecordsCount(), that.getRecordsCount(), getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
                 return false;
             }
             Iterator<Collection<Object>> thisIterator = this.records.iterator();
@@ -218,7 +223,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         
         @Override
         public int hashCode() {
-            return new HashCodeBuilder(17, 37).append(getMaxUniqueKeyValue()).append(getRecordCount()).append(getRecords()).toHashCode();
+            return new HashCodeBuilder(17, 37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
         }
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index edbd4afb2f7..afdbcce0856 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -28,6 +28,6 @@ public final class PipelineJobHasAlreadyFinishedException extends PipelineSQLExc
     private static final long serialVersionUID = 6881217592831423520L;
     
     public PipelineJobHasAlreadyFinishedException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 95, "Job has already finished, please run `CHECK MIGRATION %s` to start a new data consistency check job.", jobId);
+        super(XOpenSQLState.GENERAL_ERROR, 95, "Data consistency check job has already finished.", jobId);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 7d14d2707e0..d48f4467a0c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,7 +22,7 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 
 import java.util.Map;
 import java.util.Optional;
@@ -42,7 +42,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     private volatile boolean stopping;
     
     @Setter
-    private volatile OneOffJobBootstrap oneOffJobBootstrap;
+    private volatile JobBootstrap jobBootstrap;
     
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9e7363970a0..8b435383e69 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -104,11 +104,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
         });
     }
     
-    /**
-     * Start.
-     *
-     * @return future
-     */
+    @Override
     public CompletableFuture<?> start() {
         taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
         CompletableFuture<?> dumperFuture = incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
@@ -140,9 +136,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
         return CompletableFuture.allOf(dumperFuture, importerFuture);
     }
     
-    /**
-     * Stop.
-     */
+    @Override
     public void stop() {
         dumper.stop();
         for (Importer each : importers) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ca90aeb859e..a69ee2d121d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -83,11 +83,7 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
         return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
     }
     
-    /**
-     * Start.
-     *
-     * @return future
-     */
+    @Override
     public CompletableFuture<?> start() {
         CompletableFuture<?> dumperFuture = inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
             
@@ -138,9 +134,7 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
         return null;
     }
     
-    /**
-     * Stop.
-     */
+    @Override
     public void stop() {
         dumper.stop();
         importer.stop();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index ebc5a7e0e33..45dabe9ad68 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,11 +19,25 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Pipeline task interface.
  */
 public interface PipelineTask {
     
+    /**
+     * Start task.
+     *
+     * @return future
+     */
+    CompletableFuture<?> start();
+    
+    /**
+     * Stop task.
+     */
+    void stop();
+    
     /**
      * Get task id.
      *
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 8d71eae5a7c..406fe64310c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -70,7 +70,7 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
         ConsistencyCheckJob job = new ConsistencyCheckJob();
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
-        job.setOneOffJobBootstrap(oneOffJobBootstrap);
+        job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 77355f5281d..b86955e252d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -18,26 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
-
-import java.util.Collections;
-import java.util.Map;
 
 /**
  * Consistency check job.
@@ -45,50 +35,43 @@ import java.util.Map;
 @Slf4j
 public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
     
-    private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
-    
     private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
     
     @Override
     public void execute(final ShardingContext shardingContext) {
         String checkJobId = shardingContext.getJobName();
+        int shardingItem = shardingContext.getShardingItem();
+        log.info("Execute job {}-{}", checkJobId, shardingItem);
+        if (isStopping()) {
+            log.info("stopping true, ignore");
+            return;
+        }
         setJobId(checkJobId);
-        ConsistencyCheckJobConfiguration consistencyCheckJobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        JobStatus status = JobStatus.RUNNING;
-        ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, status);
-        jobAPI.persistJobItemProgress(jobItemContext);
-        String parentJobId = consistencyCheckJobConfig.getParentJobId();
-        log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, parentJobId);
-        JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
-        InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
-        Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = Collections.emptyMap();
-        try {
-            dataConsistencyCheckResult = StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())
-                    ? jobPublicAPI.dataConsistencyCheck(parentJobId)
-                    : jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProps());
-            status = JobStatus.FINISHED;
-        } catch (final SQLWrapperException ex) {
-            log.error("data consistency check failed", ex);
-            status = JobStatus.CONSISTENCY_CHECK_FAILURE;
-            jobAPI.persistJobItemErrorMessage(checkJobId, 0, ex);
+        ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
+        if (getTasksRunnerMap().containsKey(shardingItem)) {
+            log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
+            return;
         }
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
-        jobItemContext.setStatus(status);
-        jobAPI.persistJobItemProgress(jobItemContext);
-        jobAPI.stop(checkJobId);
-        log.info("execute consistency check job finished, job id:{}, parent job id:{}", checkJobId, parentJobId);
+        ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
+        tasksRunner.start();
+        getTasksRunnerMap().put(shardingItem, tasksRunner);
     }
     
     @Override
     public void stop() {
         setStopping(true);
-        if (null != getOneOffJobBootstrap()) {
-            getOneOffJobBootstrap().shutdown();
+        if (null != getJobBootstrap()) {
+            getJobBootstrap().shutdown();
         }
         if (null == getJobId()) {
             log.info("stop consistency check job, jobId is null, ignore");
             return;
         }
+        for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+            each.stop();
+        }
+        getTasksRunnerMap().clear();
         String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
         pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
new file mode 100644
index 00000000000..69055b5a85d
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * Consistency check tasks runner.
+ */
+@Slf4j
+public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
+    
+    private final ConsistencyCheckJobAPI checkJobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+    
+    @Getter
+    private final ConsistencyCheckJobItemContext jobItemContext;
+    
+    private final ConsistencyCheckJobConfiguration checkJobConfig;
+    
+    private final String checkJobId;
+    
+    private final String parentJobId;
+    
+    private final LifecycleExecutor checkExecutor;
+    
+    private final ExecuteCallback checkExecuteCallback;
+    
+    public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
+        this.jobItemContext = jobItemContext;
+        checkJobConfig = jobItemContext.getJobConfig();
+        checkJobId = checkJobConfig.getJobId();
+        parentJobId = checkJobConfig.getParentJobId();
+        checkExecutor = new CheckLifecycleExecutor();
+        checkExecuteCallback = new CheckExecuteCallback();
+    }
+    
+    @Override
+    public void start() {
+        if (jobItemContext.isStopping()) {
+            log.info("job stopping, ignore consistency check");
+            return;
+        }
+        PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
+        ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(1, checkJobId + "-check");
+        executeEngine.submit(checkExecutor, checkExecuteCallback);
+    }
+    
+    @Override
+    public void stop() {
+        jobItemContext.setStopping(true);
+        log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(), jobItemContext.getShardingItem());
+        checkExecutor.stop();
+    }
+    
+    private final class CheckLifecycleExecutor extends AbstractLifecycleExecutor {
+        
+        private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
+        
+        @Override
+        protected void runBlocking() {
+            log.info("execute consistency check, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+            checkJobAPI.persistJobItemProgress(jobItemContext);
+            JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
+            InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) PipelineAPIFactory.getPipelineJobAPI(jobType);
+            PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(parentJobId);
+            DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
+                    parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
+            this.calculateAlgorithm = calculateAlgorithm;
+            Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+            PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+        }
+        
+        @Override
+        protected void doStop() {
+            DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
+            log.info("doStop, algorithm={}", algorithm);
+            if (null != algorithm) {
+                try {
+                    algorithm.cancel();
+                } catch (final SQLException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+        }
+    }
+    
+    private final class CheckExecuteCallback implements ExecuteCallback {
+        
+        @Override
+        public void onSuccess() {
+            log.info("onSuccess, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+            jobItemContext.setStatus(JobStatus.FINISHED);
+            checkJobAPI.persistJobItemProgress(jobItemContext);
+            checkJobAPI.stop(checkJobId);
+        }
+        
+        @Override
+        public void onFailure(final Throwable throwable) {
+            log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+            checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
+            jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
+            checkJobAPI.persistJobItemProgress(jobItemContext);
+            checkJobAPI.stop(checkJobId);
+        }
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 404bf1edc72..b0429f58cd2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -72,7 +72,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
         MigrationJob job = new MigrationJob();
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
-        job.setOneOffJobBootstrap(oneOffJobBootstrap);
+        job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index ea68ca1cd0b..50ab3a39afb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -18,54 +18,32 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Data consistency checker for migration job.
@@ -81,144 +59,35 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
     
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
         this.jobConfig = jobConfig;
-        this.readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
+        readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
         tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
                 TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
     }
     
     @Override
-    public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
-        Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
-        Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
-                ? checkData(calculator)
-                : Collections.emptyMap();
-        Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
-        for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
-            result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
-        }
-        return result;
-    }
-    
-    private Map<String, DataConsistencyCountCheckResult> checkCount() {
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-count-check-%d");
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+    public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+        verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getSource());
+        verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
+        SchemaTableName sourceTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())), new TableName(jobConfig.getSourceTableName()));
+        SchemaTableName targetTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())), new TableName(jobConfig.getTargetTableName()));
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
-        Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(1, 1);
+        Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
                 PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            result.put(jobConfig.getSourceTableName(), checkCount(sourceDataSource, targetDataSource, executor));
-            return result;
+            PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
+            SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
+                    jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm);
+            result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
-        } finally {
-            executor.shutdown();
-            executor.shutdownNow();
-        }
-    }
-    
-    private DataConsistencyCountCheckResult checkCount(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
-        Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, jobConfig.getSourceTableName(), sourceDataSource.getDatabaseType()));
-        Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, jobConfig.getTargetTableName(), targetDataSource.getDatabaseType()));
-        long sourceCount;
-        long targetCount;
-        try {
-            sourceCount = sourceFuture.get();
-        } catch (final InterruptedException | ExecutionException ex) {
-            if (ex.getCause() instanceof PipelineSQLException) {
-                throw (PipelineSQLException) ex.getCause();
-            }
-            throw new SQLWrapperException(new SQLException(ex));
-        }
-        try {
-            targetCount = targetFuture.get();
-        } catch (final InterruptedException | ExecutionException ex) {
-            if (ex.getCause() instanceof PipelineSQLException) {
-                throw (PipelineSQLException) ex.getCause();
-            }
-            throw new SQLWrapperException(new SQLException(ex));
-        }
-        return new DataConsistencyCountCheckResult(sourceCount, targetCount);
-    }
-    
-    // TODO use digest (crc32, murmurhash)
-    private String getJobIdDigest(final String jobId) {
-        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
-    }
-    
-    private long count(final DataSource dataSource, final String tableName, final DatabaseType databaseType) {
-        String sql = PipelineSQLBuilderFactory.getInstance(databaseType.getType()).buildCountSQL(tableNameSchemaNameMapping.getSchemaName(tableName), tableName);
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(sql);
-                ResultSet resultSet = preparedStatement.executeQuery()) {
-            resultSet.next();
-            return resultSet.getLong(1);
-        } catch (final SQLException ex) {
-            throw new PipelineTableDataConsistencyCheckLoadingFailedException(tableName);
-        }
-    }
-    
-    private Map<String, DataConsistencyContentCheckResult> checkData(final DataConsistencyCalculateAlgorithm calculator) {
-        checkPipelineDatabaseType(calculator, jobConfig.getSource());
-        PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSource();
-        checkPipelineDatabaseType(calculator, jobConfig.getTarget());
-        PipelineDataSourceConfiguration targetDataSourceConfig = jobConfig.getTarget();
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
-        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(1, 1);
-        try (
-                PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
-                PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
-            String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
-            StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
-            for (String each : Collections.singleton(jobConfig.getSourceTableName())) {
-                PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
-                ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(each));
-                Collection<String> columnNames = tableMetaData.getColumnNames();
-                PipelineColumnMetaData uniqueKey = jobConfig.getUniqueKeyColumn();
-                DataConsistencyCalculateParameter sourceParameter = buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
-                DataConsistencyCalculateParameter targetParameter = buildParameter(
-                        targetDataSource, tableNameSchemaNameMapping, jobConfig.getTargetTableName(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
-                Iterator<Object> sourceCalculatedResults = calculator.calculate(sourceParameter).iterator();
-                Iterator<Object> targetCalculatedResults = calculator.calculate(targetParameter).iterator();
-                boolean contentMatched = true;
-                while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
-                    if (null != readRateLimitAlgorithm) {
-                        readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
-                    }
-                    Future<Object> sourceFuture = executor.submit(sourceCalculatedResults::next);
-                    Future<Object> targetFuture = executor.submit(targetCalculatedResults::next);
-                    Object sourceCalculatedResult = sourceFuture.get();
-                    Object targetCalculatedResult = targetFuture.get();
-                    contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
-                    if (!contentMatched) {
-                        break;
-                    }
-                }
-                result.put(each, new DataConsistencyContentCheckResult(contentMatched));
-            }
-        } catch (final SQLException ex) {
-            throw new SQLWrapperException(ex);
-        } catch (final ExecutionException | InterruptedException ex) {
-            throw new SQLWrapperException(new SQLException(ex.getCause()));
-        } finally {
-            executor.shutdown();
-            executor.shutdownNow();
         }
         return result;
     }
     
-    private void checkPipelineDatabaseType(final DataConsistencyCalculateAlgorithm calculator, final PipelineDataSourceConfiguration dataSourceConfig) {
-        ShardingSpherePreconditions.checkState(calculator.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
+    private void verifyPipelineDatabaseType(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final PipelineDataSourceConfiguration dataSourceConfig) {
+        ShardingSpherePreconditions.checkState(calculateAlgorithm.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
                 () -> new UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
     }
-    
-    private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping tableNameSchemaNameMapping,
-                                                             final String tableName, final Collection<String> columnNames,
-                                                             final String sourceDatabaseType, final String targetDatabaseType, final PipelineColumnMetaData uniqueKey) {
-        return new DataConsistencyCalculateParameter(sourceDataSource, tableNameSchemaNameMapping, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
-    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 018570e184c..244dc46efcf 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -112,8 +112,8 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
     public void stop() {
         setStopping(true);
         dataSourceManager.close();
-        if (null != getOneOffJobBootstrap()) {
-            getOneOffJobBootstrap().shutdown();
+        if (null != getJobBootstrap()) {
+            getJobBootstrap().shutdown();
         }
         String jobId = getJobId();
         if (null == jobId) {
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index b0d0f42a10d..e5ae0a9635e 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
@@ -35,7 +35,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -60,28 +59,28 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithmTest {
     @Before
     public void setUp() throws SQLException {
         PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true);
-        parameter = new DataConsistencyCalculateParameter(pipelineDataSource, new TableNameSchemaNameMapping(Collections.emptyMap()),
+        parameter = new DataConsistencyCalculateParameter(pipelineDataSource, null,
                 "foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", "FIXTURE", uniqueKey);
         when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
     
     @Test
     public void assertCalculateSuccess() throws SQLException {
-        PreparedStatement preparedStatement0 = mockPreparedStatement(0L);
+        PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
         when(connection.prepareStatement("SELECT CRC32(foo_col) FROM foo_tbl")).thenReturn(preparedStatement0);
-        PreparedStatement preparedStatement1 = mockPreparedStatement(1L);
+        PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
         when(connection.prepareStatement("SELECT CRC32(bar_col) FROM foo_tbl")).thenReturn(preparedStatement1);
-        Iterator<Object> actual = new CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
-        assertThat(actual.next(), is(0L));
-        assertThat(actual.next(), is(1L));
+        Iterator<DataConsistencyCalculatedResult> actual = new CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
+        assertThat(actual.next().getRecordsCount(), is(10));
         assertFalse(actual.hasNext());
     }
     
-    private PreparedStatement mockPreparedStatement(final long expectedCRC32Result) throws SQLException {
+    private PreparedStatement mockPreparedStatement(final long expectedCRC32Result, final int expectedRecordsCount) throws SQLException {
         ResultSet resultSet = mock(ResultSet.class);
         PreparedStatement result = mock(PreparedStatement.class, RETURNS_DEEP_STUBS);
         when(result.executeQuery()).thenReturn(resultSet);
         when(resultSet.getLong(1)).thenReturn(expectedCRC32Result);
+        when(resultSet.getInt(2)).thenReturn(expectedRecordsCount);
         return result;
     }
     
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 14440f0344d..286de508afe 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -51,7 +51,7 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
     @Override
     public Optional<String> buildCRC32SQL(final String schemaName, final String tableName, final String column) {
-        return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS UNSIGNED)) AS checksum FROM %s", quote(column), quote(tableName)));
+        return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", quote(column), quote(tableName)));
     }
     
     @Override
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 3a2049c3487..f2b276fefde 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -55,7 +55,7 @@ public final class MySQLPipelineSQLBuilderTest {
     public void assertBuildSumCrc32SQL() {
         Optional<String> actual = sqlBuilder.buildCRC32SQL(null, "t2", "id");
         assertTrue(actual.isPresent());
-        assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS UNSIGNED)) AS checksum FROM t2"));
+        assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM t2"));
     }
     
     private DataRecord mockDataRecord(final String tableName) {
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 0c4047c2286..2929e143128 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -99,8 +99,9 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
         startIncrementTask(new MySQLIncrementTask(jdbcTemplate, getSourceTableOrderName(), keyGenerateAlgorithm, 30));
         String orderJobId = getJobIdByTableName(getSourceTableOrderName());
         String orderItemJobId = getJobIdByTableName("t_order_item");
-        assertMigrationSuccessById(orderJobId);
-        assertMigrationSuccessById(orderItemJobId);
+        assertMigrationSuccessById(orderJobId, "DATA_MATCH");
+        assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
+        assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
         if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
             for (String each : listJobId()) {
                 commitMigrationByJobId(each);
@@ -111,12 +112,12 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
         assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
     }
     
-    private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
+    private void assertMigrationSuccessById(final String jobId, final String algorithmType) throws SQLException, InterruptedException {
         List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         for (Map<String, Object> each : jobStatus) {
             assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
         }
-        assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+        assertCheckMigrationSuccess(jobId, algorithmType);
         stopMigrationByJobId(jobId);
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
index 3d1c2827377..01fd3394a42 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.junit.Test;
 
 import java.util.Properties;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
index f6ec7327507..051ce468505 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import lombok.Getter;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -39,8 +40,12 @@ public final class DataConsistencyCalculateAlgorithmFixture implements DataConsi
     }
     
     @Override
-    public Iterable<Object> calculate(final DataConsistencyCalculateParameter parameter) {
-        return Collections.singletonList(true);
+    public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+        return Collections.singletonList(new FixtureDataConsistencyCalculatedResult(2));
+    }
+    
+    @Override
+    public void cancel() {
     }
     
     @Override
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
similarity index 64%
copy from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
index 3d1c2827377..b61af786c4a 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.junit.Test;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+@RequiredArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class FixtureDataConsistencyCalculatedResult implements DataConsistencyCalculatedResult {
     
-    @Test
-    public void assertNewInstanceSuccess() {
-        DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new Properties());
-    }
+    private final int recordsCount;
 }