You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/19 01:52:43 UTC

[shardingsphere] branch master updated: Add DataConsistencyCountCheckResult and DataConsistencyContentCheckResult to avoid set field in DataConsistencyCheckResult (#16910)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d2a9db55f30 Add DataConsistencyCountCheckResult and DataConsistencyContentCheckResult to avoid set field in DataConsistencyCheckResult (#16910)
d2a9db55f30 is described below

commit d2a9db55f3003834c28be596572bbb1b47e9940c
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Apr 19 09:52:38 2022 +0800

    Add DataConsistencyCountCheckResult and DataConsistencyContentCheckResult to avoid set field in DataConsistencyCheckResult (#16910)
    
    * Refactor DataConsistencyCheckResult
    
    * Refactor DataConsistencyChecker
    
    * Refactor CheckScalingQueryResultSet
    
    * Refactor DataConsistencyContentCheckResult and DataConsistencyCountCheckResult
    
    * Move packages
    
    * Remove useless getter for DataConsistencyCheckerImpl
    
    * Refactor DataConsistencyCheckerImpl
---
 .../handler/query/CheckScalingQueryResultSet.java  | 14 ++---
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 26 +++++----
 .../check/consistency/DataConsistencyChecker.java  |  9 +--
 .../consistency/DataConsistencyCheckerImpl.java    | 64 +++++++++------------
 ...StreamingDataConsistencyCalculateAlgorithm.java |  2 +-
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |  2 +-
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  2 +-
 .../core/datasource/PipelineDataSourceFactory.java |  5 +-
 .../core/datasource/PipelineDataSourceManager.java |  6 +-
 .../memory}/MemoryPipelineChannelFactory.java      |  4 +-
 .../data/pipeline/core/job/FinishedCheckJob.java   |  4 +-
 .../scenario/rulealtered/RuleAlteredContext.java   |  2 +-
 .../job/environment/ScalingEnvironmentManager.java |  4 +-
 ...k.consistency.DataConsistencyCalculateAlgorithm |  4 +-
 ...eline.spi.ingest.channel.PipelineChannelFactory |  2 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java |  2 +-
 .../fixture/FixturePipelineSQLBuilder.java         |  2 +-
 ...k.consistency.DataConsistencyCalculateAlgorithm |  4 +-
 ...data.pipeline.spi.sqlbuilder.PipelineSQLBuilder |  2 +-
 .../data/pipeline/api/RuleAlteredJobAPI.java       |  4 +-
 .../consistency/DataConsistencyCheckResult.java    | 20 ++-----
 ...java => DataConsistencyContentCheckResult.java} | 22 ++------
 ...t.java => DataConsistencyCountCheckResult.java} | 14 ++---
 .../test/mysql/env/ITEnvironmentContext.java       |  3 +-
 .../test/mysql/env/config/TargetConfiguration.java | 11 ----
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   | 65 ++++++++++++++--------
 .../DataConsistencyCheckerImplTest.java            | 13 +++--
 .../pipeline/core/util/PipelineContextUtil.java    |  4 +-
 .../scenario/rulealtered/RuleAlteredJobTest.java   |  2 +-
 29 files changed, 144 insertions(+), 174 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
index 5858521f6bd..dc5b5dee3f8 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
@@ -52,13 +52,13 @@ public final class CheckScalingQueryResultSet implements DistSQLResultSet {
         }
         data = checkResultMap.entrySet().stream()
                 .map(each -> {
-                    Collection<Object> list = new LinkedList<>();
-                    list.add(each.getKey());
-                    list.add(each.getValue().getSourceRecordsCount());
-                    list.add(each.getValue().getTargetRecordsCount());
-                    list.add(each.getValue().isRecordsCountMatched() + "");
-                    list.add(each.getValue().isRecordsContentMatched() + "");
-                    return list;
+                    Collection<Object> result = new LinkedList<>();
+                    result.add(each.getKey());
+                    result.add(each.getValue().getCountCheckResult().getSourceRecordsCount());
+                    result.add(each.getValue().getCountCheckResult().getTargetRecordsCount());
+                    result.add(each.getValue().getCountCheckResult().isMatched() + "");
+                    result.add(each.getValue().getContentCheckResult().isMatched() + "");
+                    return result;
                 }).collect(Collectors.toList()).iterator();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index ed2b9cab1af..c64c62021ba 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -29,8 +31,8 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
@@ -293,24 +295,26 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
         String jobId = jobConfig.getHandleConfig().getJobId();
         DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobConfig);
-        Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.checkRecordsCount();
-        if (result.values().stream().allMatch(DataConsistencyCheckResult::isRecordsCountMatched)) {
-            Map<String, Boolean> contentCheckResult = dataConsistencyChecker.checkRecordsContent(calculator);
-            result.forEach((key, value) -> value.setRecordsContentMatched(contentCheckResult.getOrDefault(key, false)));
+        Map<String, DataConsistencyCountCheckResult> countCheckResult = dataConsistencyChecker.checkCount();
+        Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
+                ? dataConsistencyChecker.checkContent(calculator) : Collections.emptyMap();
+        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getClass().getName(), countCheckResult);
+        Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
+        for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
+            result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
         }
-        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getClass().getName(), result);
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
         return result;
     }
     
     @Override
-    public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResultMap) {
-        if (checkResultMap.isEmpty()) {
+    public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
+        if (checkResults.isEmpty()) {
             return false;
         }
-        for (Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
-            boolean recordsCountMatched = entry.getValue().isRecordsCountMatched();
-            boolean recordsContentMatched = entry.getValue().isRecordsContentMatched();
+        for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
+            boolean recordsCountMatched = entry.getValue().getCountCheckResult().isMatched();
+            boolean recordsContentMatched = entry.getValue().getContentCheckResult().isMatched();
             if (!recordsContentMatched || !recordsCountMatched) {
                 log.error("Scaling job: {}, table: {} data consistency check failed, recordsContentMatched: {}, recordsCountMatched: {}",
                         jobId, entry.getKey(), recordsContentMatched, recordsCountMatched);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 7b6ec52b6cc..904ea28e053 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -17,13 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 
 import java.util.Map;
 
 /**
- * Data consistency checker interface.
+ * Data consistency checker.
  */
 public interface DataConsistencyChecker {
     
@@ -32,7 +33,7 @@ public interface DataConsistencyChecker {
      *
      * @return records count check result. key is logic table name, value is check result.
      */
-    Map<String, DataConsistencyCheckResult> checkRecordsCount();
+    Map<String, DataConsistencyCountCheckResult> checkCount();
     
     /**
      * Check whether each table's records content is valid.
@@ -40,5 +41,5 @@ public interface DataConsistencyChecker {
      * @param calculator data consistency calculate algorithm
      * @return records content check result. key is logic table name, value is check result.
      */
-    Map<String, Boolean> checkRecordsContent(DataConsistencyCalculateAlgorithm calculator);
+    Map<String, DataConsistencyContentCheckResult> checkContent(DataConsistencyCalculateAlgorithm calculator);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index e4c69c65790..075d686b091 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -18,10 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import com.google.common.base.Preconditions;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -31,7 +31,6 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -64,44 +63,35 @@ import java.util.concurrent.TimeUnit;
 /**
  * Data consistency checker implementation.
  */
-@Getter
 @Slf4j
 public final class DataConsistencyCheckerImpl implements DataConsistencyChecker {
     
-    private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-    
     private final JobConfiguration jobConfig;
     
-    private final RuleAlteredContext ruleAlteredContext;
-    
-    private final String jobId;
-    
     private final Collection<String> logicTableNames;
     
     public DataConsistencyCheckerImpl(final JobConfiguration jobConfig) {
         this.jobConfig = jobConfig;
-        ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
-        jobId = jobConfig.getHandleConfig().getJobId();
         logicTableNames = jobConfig.getHandleConfig().splitLogicTableNames();
     }
     
     @Override
-    public Map<String, DataConsistencyCheckResult> checkRecordsCount() {
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobId) + "-countCheck-%d");
+    public Map<String, DataConsistencyCountCheckResult> checkCount() {
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-count-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
             jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
         PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
             jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
-        try (PipelineDataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceDataSourceConfig);
-             PipelineDataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetDataSourceConfig)) {
-            Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
+        try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
+             PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
+            Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>();
             for (String each : logicTableNames) {
                 result.put(each, countCheck(each, sourceDataSource, targetDataSource, executor));
             }
             return result;
         } catch (final SQLException ex) {
-            throw new PipelineDataConsistencyCheckFailedException("count check failed", ex);
+            throw new PipelineDataConsistencyCheckFailedException("Count check failed", ex);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
@@ -115,16 +105,16 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
         return jobId.substring(0, 6);
     }
     
-    private DataConsistencyCheckResult countCheck(
+    private DataConsistencyCountCheckResult countCheck(
             final String table, final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
         try {
             Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
             Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, table, targetDataSource.getDatabaseType()));
             long sourceCount = sourceFuture.get();
             long targetCount = targetFuture.get();
-            return new DataConsistencyCheckResult(sourceCount, targetCount);
+            return new DataConsistencyCountCheckResult(sourceCount, targetCount);
         } catch (final InterruptedException | ExecutionException ex) {
-            throw new PipelineDataConsistencyCheckFailedException(String.format("count check failed for table '%s'", table), ex);
+            throw new PipelineDataConsistencyCheckFailedException(String.format("Count check failed for table '%s'", table), ex);
         }
     }
     
@@ -135,12 +125,12 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
             resultSet.next();
             return resultSet.getLong(1);
         } catch (final SQLException ex) {
-            throw new PipelineDataConsistencyCheckFailedException(String.format("count for table '%s' failed", table), ex);
+            throw new PipelineDataConsistencyCheckFailedException(String.format("Count for table '%s' failed", table), ex);
         }
     }
     
     @Override
-    public Map<String, Boolean> checkRecordsContent(final DataConsistencyCalculateAlgorithm dataConsistencyCalculator) {
+    public Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm dataConsistencyCalculator) {
         Collection<String> supportedDatabaseTypes = dataConsistencyCalculator.getSupportedDatabaseTypes();
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
                 jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
@@ -149,17 +139,17 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
                 jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
         checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, targetDataSourceConfig.getDatabaseType().getName());
         addDataSourceConfigToMySQL(sourceDataSourceConfig, targetDataSourceConfig);
-        Map<String, Boolean> result = new HashMap<>();
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobId) + "-dataCheck-%d");
+        Map<String, DataConsistencyContentCheckResult> result = new HashMap<>();
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-dataCheck-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        JobRateLimitAlgorithm inputRateLimitAlgorithm = ruleAlteredContext.getInputRateLimitAlgorithm();
-        try (PipelineDataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceDataSourceConfig);
-             PipelineDataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetDataSourceConfig)) {
+        JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
+        try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
+             PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
             Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getWorkflowConfig().getSchemaName());
             logicTableNames.forEach(each -> {
                 //TODO put to preparer
                 if (!tableMetaDataMap.containsKey(each)) {
-                    throw new PipelineDataConsistencyCheckFailedException(String.format("could not get metadata for table '%s'", each));
+                    throw new PipelineDataConsistencyCheckFailedException(String.format("Could not get metadata for table '%s'", each));
                 }
             });
             String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getName();
@@ -172,7 +162,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
                 DataConsistencyCalculateParameter targetParameter = buildParameter(targetDataSource, each, columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
                 Iterator<Object> sourceCalculatedResults = dataConsistencyCalculator.calculate(sourceParameter).iterator();
                 Iterator<Object> targetCalculatedResults = dataConsistencyCalculator.calculate(targetParameter).iterator();
-                boolean calculateResultsEquals = true;
+                boolean contentMatched = true;
                 while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
                     if (null != inputRateLimitAlgorithm) {
                         inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
@@ -181,15 +171,15 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
                     Future<Object> targetFuture = executor.submit(targetCalculatedResults::next);
                     Object sourceCalculatedResult = sourceFuture.get();
                     Object targetCalculatedResult = targetFuture.get();
-                    calculateResultsEquals = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
-                    if (!calculateResultsEquals) {
+                    contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
+                    if (!contentMatched) {
                         break;
                     }
                 }
-                result.put(each, calculateResultsEquals);
+                result.put(each, new DataConsistencyContentCheckResult(contentMatched));
             }
         } catch (final ExecutionException | InterruptedException | SQLException ex) {
-            throw new PipelineDataConsistencyCheckFailedException("data check failed", ex);
+            throw new PipelineDataConsistencyCheckFailedException("Data check failed", ex);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
@@ -199,16 +189,16 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
     
     private void checkDatabaseTypeSupportedOrNot(final Collection<String> supportedDatabaseTypes, final String databaseType) {
         if (!supportedDatabaseTypes.contains(databaseType)) {
-            throw new PipelineDataConsistencyCheckFailedException("database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
+            throw new PipelineDataConsistencyCheckFailedException("Database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
         }
     }
     
     private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
         ContextManager contextManager = PipelineContext.getContextManager();
-        Preconditions.checkNotNull(contextManager, "contextManager null");
+        Preconditions.checkNotNull(contextManager, "ContextManager null");
         ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(schemaName);
         if (null == metaData) {
-            throw new RuntimeException("Can not get metaData by schemaName " + schemaName);
+            throw new RuntimeException("Can not get meta data by schema name " + schemaName);
         }
         return metaData.getDefaultSchema().getTables();
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingDataConsistencyCalculateAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index 9c0987533d2..d866a90a456 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 8128b6cda49..0adf19036aa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
similarity index 99%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCalculateAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index bf69453c271..a00b0ab406c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import lombok.Getter;
 import lombok.NonNull;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
index 2109afc1dd8..e9ff3c18602 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -28,6 +30,7 @@ import java.sql.SQLException;
 /**
  * Pipeline data source factory.
  */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineDataSourceFactory {
     
     /**
@@ -37,7 +40,7 @@ public final class PipelineDataSourceFactory {
      * @return new data source wrapper
      */
     @SneakyThrows(SQLException.class)
-    public PipelineDataSourceWrapper newInstance(final PipelineDataSourceConfiguration dataSourceConfig) {
+    public static PipelineDataSourceWrapper newInstance(final PipelineDataSourceConfiguration dataSourceConfig) {
         DataSource pipelineDataSource = PipelineDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createPipelineDataSource(dataSourceConfig.getDataSourceConfiguration());
         return new PipelineDataSourceWrapper(pipelineDataSource, dataSourceConfig.getDatabaseType());
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index 2754bb73b0c..8659174fe4b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
-import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -29,12 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * Pipeline data source manager.
  */
-@NoArgsConstructor
 @Slf4j
 public final class PipelineDataSourceManager implements AutoCloseable {
     
-    private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-    
     private final Map<PipelineDataSourceConfiguration, PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
     
     /**
@@ -53,7 +49,7 @@ public final class PipelineDataSourceManager implements AutoCloseable {
             if (null != result) {
                 return result;
             }
-            result = dataSourceFactory.newInstance(dataSourceConfig);
+            result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
             cachedDataSources.put(dataSourceConfig, result);
             return result;
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelFactory.java
similarity index 88%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelFactory.java
index 01c69f94dfc..2f119b90882 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelFactory.java
@@ -15,15 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;
 
 import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 
 import java.util.Properties;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 07311cda059..d09647b5969 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -113,8 +112,7 @@ public final class FinishedCheckJob implements SimpleJob {
     private boolean dataConsistencyCheck(final JobConfiguration jobConfig) {
         String jobId = jobConfig.getHandleConfig().getJobId();
         log.info("dataConsistencyCheck for job {}", jobId);
-        Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobConfig);
-        return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap);
+        return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, ruleAlteredJobAPI.dataConsistencyCheck(jobConfig));
     }
     
     private void switchClusterConfiguration(final String schemaName, final JobConfiguration jobConfig, final RuleBasedJobLockAlgorithm checkoutLockAlgorithm) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index b188c4a3771..b03a1e92140 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index bcb6b3c3b7e..772ba8e478b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -36,8 +36,6 @@ import java.util.Collection;
 @Slf4j
 public final class ScalingEnvironmentManager {
     
-    private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-    
     /**
      * Cleanup target tables.
      *
@@ -49,7 +47,7 @@ public final class ScalingEnvironmentManager {
         Collection<String> tables = jobConfig.getHandleConfig().splitLogicTableNames();
         log.info("cleanupTargetTables, tables={}", tables);
         YamlPipelineDataSourceConfiguration target = jobConfig.getPipelineConfig().getTarget();
-        try (PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
+        try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
              Connection connection = dataSource.getConnection()) {
             for (String each : tables) {
                 String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
index b095d5571b0..d122b3da26d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
@@ -15,5 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCalculateAlgorithm
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.CRC32MatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
index 7c0c3d65284..976e49b8443 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory
+org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelFactory
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithmTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index 791b7d223c7..0fd9da9f68b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 82b127e7baa..d76a21aaf8e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.fixture;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.fixture;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
index b095d5571b0..d122b3da26d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
@@ -15,5 +15,5 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCalculateAlgorithm
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.CRC32MatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index df902547e48..ab29ad1625d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.fixture.FixturePipelineSQLBuilder
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.fixture.FixturePipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 8a31e1ff43b..16e0d0d7253 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -147,10 +147,10 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
      * Aggregate data consistency check results.
      *
      * @param jobId job id
-     * @param checkResultMap check result map
+     * @param checkResults check results
      * @return check success or not
      */
-    boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResultMap);
+    boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResults);
     
     /**
      * Switch job source schema's configuration to job target configuration.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 895ecac0c7b..04452249d44 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -18,28 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
 /**
  * Data consistency check result.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-@ToString
+@ToString(callSuper = true)
 public final class DataConsistencyCheckResult {
     
-    private final long sourceRecordsCount;
+    private final DataConsistencyCountCheckResult countCheckResult;
     
-    private final long targetRecordsCount;
-    
-    private final boolean recordsCountMatched;
-    
-    private boolean recordsContentMatched;
-    
-    public DataConsistencyCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
-        this.sourceRecordsCount = sourceRecordsCount;
-        this.targetRecordsCount = targetRecordsCount;
-        recordsCountMatched = sourceRecordsCount == targetRecordsCount;
-    }
+    private final DataConsistencyContentCheckResult contentCheckResult;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyContentCheckResult.java
similarity index 61%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyContentCheckResult.java
index 895ecac0c7b..5e87aad5999 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyContentCheckResult.java
@@ -18,28 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
 /**
- * Data consistency check result.
+ * Data consistency content check result.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
 @ToString
-public final class DataConsistencyCheckResult {
+public final class DataConsistencyContentCheckResult {
     
-    private final long sourceRecordsCount;
-    
-    private final long targetRecordsCount;
-    
-    private final boolean recordsCountMatched;
-    
-    private boolean recordsContentMatched;
-    
-    public DataConsistencyCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
-        this.sourceRecordsCount = sourceRecordsCount;
-        this.targetRecordsCount = targetRecordsCount;
-        recordsCountMatched = sourceRecordsCount == targetRecordsCount;
-    }
+    private final boolean matched;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCountCheckResult.java
similarity index 75%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCountCheckResult.java
index 895ecac0c7b..d166dba8a1d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCountCheckResult.java
@@ -18,28 +18,24 @@
 package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
 import lombok.Getter;
-import lombok.Setter;
 import lombok.ToString;
 
 /**
- * Data consistency check result.
+ * Data consistency count check result.
  */
 @Getter
-@Setter
 @ToString
-public final class DataConsistencyCheckResult {
+public final class DataConsistencyCountCheckResult {
     
     private final long sourceRecordsCount;
     
     private final long targetRecordsCount;
     
-    private final boolean recordsCountMatched;
+    private final boolean matched;
     
-    private boolean recordsContentMatched;
-    
-    public DataConsistencyCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
+    public DataConsistencyCountCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
         this.sourceRecordsCount = sourceRecordsCount;
         this.targetRecordsCount = targetRecordsCount;
-        recordsCountMatched = sourceRecordsCount == targetRecordsCount;
+        matched = sourceRecordsCount == targetRecordsCount;
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
index 4001d83e04b..bf2fbe1a44c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.integration.scaling.test.mysql.env.cases.DataSet;
 import org.apache.shardingsphere.integration.scaling.test.mysql.env.cases.Type;
 import org.apache.shardingsphere.integration.scaling.test.mysql.env.config.SourceConfiguration;
@@ -59,7 +60,7 @@ public final class ITEnvironmentContext {
         Map<String, YamlTableRuleConfiguration> sourceTableRules = createSourceTableRules();
         scalingConfiguration = createScalingConfiguration(sourceTableRules);
         sourceDataSource = SourceConfiguration.createHostDataSource(sourceTableRules);
-        targetDataSource = TargetConfiguration.createHostDataSource();
+        targetDataSource = PipelineDataSourceFactory.newInstance(TargetConfiguration.getHostConfiguration());
     }
     
     @SneakyThrows
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java
index 8773569d661..ad78c1f7f5c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java
@@ -18,10 +18,8 @@
 package org.apache.shardingsphere.integration.scaling.test.mysql.env.config;
 
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.integration.scaling.test.mysql.env.IntegrationTestEnvironment;
 
-import javax.sql.DataSource;
 import java.util.Properties;
 
 /**
@@ -54,13 +52,4 @@ public final class TargetConfiguration {
     private static StandardPipelineDataSourceConfiguration getConfiguration(final String host) {
         return new StandardPipelineDataSourceConfiguration(String.format(TARGET_JDBC_URL, host), ENGINE_ENV_PROPS.getProperty("db.username"), ENGINE_ENV_PROPS.getProperty("db.password"));
     }
-    
-    /**
-     * Create host standard pipeline data source.
-     *
-     * @return data source
-     */
-    public static DataSource createHostDataSource() {
-        return new PipelineDataSourceFactory().newInstance(getHostConfiguration());
-    }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 4081ceb976a..54a339c5f41 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
-import com.google.common.collect.ImmutableMap;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -44,6 +45,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -145,31 +147,46 @@ public final class RuleAlteredJobAPIImplTest {
         Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE");
         ruleAlteredJobAPI.restoreClusterWriteDB(schemaName, jobId.get());
         assertThat(checkResultMap.size(), is(1));
-        assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
-        assertTrue(checkResultMap.get("t_order").isRecordsContentMatched());
-        assertThat(checkResultMap.get("t_order").getTargetRecordsCount(), is(2L));
+        assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
+        assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(2L));
+        assertTrue(checkResultMap.get("t_order").getContentCheckResult().isMatched());
     }
     
     @Test
-    public void assertAggregateDataConsistencyCheckResults() {
-        String jobId = "1";
-        Map<String, DataConsistencyCheckResult> checkResultMap;
-        checkResultMap = Collections.emptyMap();
-        assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(false));
-        DataConsistencyCheckResult trueResult = new DataConsistencyCheckResult(1, 1);
-        trueResult.setRecordsContentMatched(true);
-        DataConsistencyCheckResult checkResult;
-        checkResult = new DataConsistencyCheckResult(100, 95);
-        checkResultMap = ImmutableMap.<String, DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", checkResult).build();
-        assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(false));
-        checkResult = new DataConsistencyCheckResult(100, 100);
-        checkResult.setRecordsContentMatched(false);
-        checkResultMap = ImmutableMap.<String, DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", checkResult).build();
-        assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(false));
-        checkResult = new DataConsistencyCheckResult(100, 100);
-        checkResult.setRecordsContentMatched(true);
-        checkResultMap = ImmutableMap.<String, DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", checkResult).build();
-        assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(true));
+    public void assertAggregateEmptyDataConsistencyCheckResults() {
+        assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap()), is(false));
+    }
+    
+    @Test
+    public void assertAggregateDifferentCountDataConsistencyCheckResults() {
+        DataConsistencyCountCheckResult equalCountCheckResult = new DataConsistencyCountCheckResult(100, 100);
+        DataConsistencyCountCheckResult notEqualCountCheckResult = new DataConsistencyCountCheckResult(100, 95);
+        DataConsistencyContentCheckResult equalContentCheckResult = new DataConsistencyContentCheckResult(false);
+        Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
+        checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+        checkResults.put("bar_tbl", new DataConsistencyCheckResult(notEqualCountCheckResult, equalContentCheckResult));
+        assertFalse(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
+    }
+    
+    @Test
+    public void assertAggregateDifferentContentDataConsistencyCheckResults() {
+        DataConsistencyCountCheckResult equalCountCheckResult = new DataConsistencyCountCheckResult(100, 100);
+        DataConsistencyContentCheckResult equalContentCheckResult = new DataConsistencyContentCheckResult(true);
+        DataConsistencyContentCheckResult notEqualContentCheckResult = new DataConsistencyContentCheckResult(false);
+        Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
+        checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+        checkResults.put("bar_tbl", new DataConsistencyCheckResult(equalCountCheckResult, notEqualContentCheckResult));
+        assertFalse(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
+    }
+    
+    @Test
+    public void assertAggregateSameDataConsistencyCheckResults() {
+        DataConsistencyCountCheckResult equalCountCheckResult = new DataConsistencyCountCheckResult(100, 100);
+        DataConsistencyContentCheckResult equalContentCheckResult = new DataConsistencyContentCheckResult(true);
+        Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
+        checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+        checkResults.put("bar_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+        assertTrue(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
     }
     
     @Test(expected = PipelineVerifyFailedException.class)
@@ -214,7 +231,7 @@ public final class RuleAlteredJobAPIImplTest {
         ruleAlteredJobAPI.stop(jobId.get());
         ruleAlteredJobAPI.reset(jobId.get());
         Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobConfig);
-        assertThat(checkResultMap.get("t_order").getTargetRecordsCount(), is(0L));
+        assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(0L));
     }
     
     @SneakyThrows(SQLException.class)
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index 5018da801b7..df560ed78f4 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -18,7 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCalculateAlgorithm;
@@ -56,11 +57,11 @@ public final class DataConsistencyCheckerImplTest {
         initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
         initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
         DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobContext.getJobConfig());
-        Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.checkRecordsCount();
-        assertTrue(resultMap.get("t_order").isRecordsCountMatched());
-        assertThat(resultMap.get("t_order").getSourceRecordsCount(), is(resultMap.get("t_order").getTargetRecordsCount()));
-        Map<String, Boolean> dataCheckResultMap = dataConsistencyChecker.checkRecordsContent(new FixtureDataConsistencyCalculateAlgorithm());
-        assertTrue(dataCheckResultMap.get("t_order"));
+        Map<String, DataConsistencyCountCheckResult> countCheckResults = dataConsistencyChecker.checkCount();
+        assertTrue(countCheckResults.get("t_order").isMatched());
+        assertThat(countCheckResults.get("t_order").getSourceRecordsCount(), is(countCheckResults.get("t_order").getTargetRecordsCount()));
+        Map<String, DataConsistencyContentCheckResult> contentCheckResults = dataConsistencyChecker.checkContent(new FixtureDataConsistencyCalculateAlgorithm());
+        assertTrue(contentCheckResults.get("t_order").isMatched());
     }
     
     @SneakyThrows(SQLException.class)
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index bb1ad78d0c7..91cbb4f749d 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
-import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -94,7 +94,7 @@ public final class PipelineContextUtil {
         }
         ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig = new ShardingSpherePipelineDataSourceConfiguration(
                 ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"));
-        ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) new PipelineDataSourceFactory().newInstance(pipelineDataSourceConfig).getDataSource();
+        ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDataSource();
         ContextManager contextManager = shardingSphereDataSource.getContextManager();
         MetaDataPersistService metaDataPersistService = new MetaDataPersistService(getClusterPersistRepository());
         MetaDataContexts metaDataContexts = renewMetaDataContexts(contextManager.getMetaDataContexts(), metaDataPersistService);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
index 5c3b2533e0d..df17842de36 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
@@ -62,7 +62,7 @@ public final class RuleAlteredJobTest {
     @SneakyThrows(SQLException.class)
     private void initTableData(final JobConfiguration jobConfig) {
         YamlPipelineDataSourceConfiguration source = jobConfig.getPipelineConfig().getSource();
-        try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceFactory().newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
+        try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
              Connection connection = dataSource.getConnection();
              Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");