You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/23 08:17:55 UTC

[shardingsphere] branch master updated: Extract InventoryIncrementalJobAPI and extend InventoryIncrementalJobPublicAPI for common usage (#21152)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0574a476e84 Extract InventoryIncrementalJobAPI and extend InventoryIncrementalJobPublicAPI for common usage (#21152)
0574a476e84 is described below

commit 0574a476e84a8c7dbea13355f3b941f1d657a165
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Sep 23 16:17:47 2022 +0800

    Extract InventoryIncrementalJobAPI and extend InventoryIncrementalJobPublicAPI for common usage (#21152)
    
    * Extract more methods to InventoryIncrementalJobPublicAPI; Add InventoryIncrementalJobAPI
    
    * Add PipelineDataConsistencyChecker
    
    * Rename AbstractInventoryIncrementalJobAPIImpl
---
 .../api/InventoryIncrementalJobPublicAPI.java      |  43 ++++-
 .../data/pipeline/api/MigrationJobPublicAPI.java   |  38 ----
 .../PipelineDataConsistencyChecker.java}           |  25 +--
 .../api/InventoryIncrementalJobAPI.java}           |  30 +--
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 203 +++++++++++++++++++++
 .../impl/InventoryIncrementalJobPublicAPIImpl.java |  74 --------
 .../MigrationChangedJobConfigurationProcessor.java |   7 +-
 .../migration/MigrationDataConsistencyChecker.java |  16 +-
 .../scenario/migration/MigrationJobAPI.java        |  36 +---
 .../scenario/migration/MigrationJobAPIImpl.java    | 122 ++-----------
 .../migration/MigrationProcessContext.java         |   6 +
 .../MigrationDataConsistencyCheckerTest.java       |   4 +-
 12 files changed, 299 insertions(+), 305 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 53b89e72f35..5750e196aca 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -17,14 +17,22 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
 
 /**
- * Inventory incremental job API.
+ * Inventory incremental job public API.
  */
+@SingletonSPI
 public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI, TypedSPI {
     
     /**
@@ -70,4 +78,37 @@ public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI,
      * @throws SQLException when commit underlying database data
      */
     void commit(String jobId) throws SQLException;
+    
+    /**
+     * Get job progress.
+     *
+     * @param jobId job id
+     * @return each sharding item progress
+     */
+    Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String jobId);
+    
+    /**
+     * List all data consistency check algorithms from SPI.
+     *
+     * @return data consistency check algorithms
+     */
+    Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms();
+    
+    /**
+     * Do data consistency check.
+     *
+     * @param jobId job id
+     * @return each logic table check result
+     */
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
+    
+    /**
+     * Do data consistency check.
+     *
+     * @param jobId job id
+     * @param algorithmType algorithm type
+     * @param algorithmProps algorithm props. Nullable
+     * @return each logic table check result
+     */
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index e60b48bfe92..fb8fe96405e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -17,10 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
@@ -29,7 +26,6 @@ import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 /**
  * Migration job public API.
@@ -45,40 +41,6 @@ public interface MigrationJobPublicAPI extends InventoryIncrementalJobPublicAPI,
     @Override
     List<MigrationJobInfo> list();
     
-    /**
-     * Get job progress.
-     *
-     * @param jobId job id
-     * @return each sharding item progress
-     */
-    // TODO add JobProgress
-    Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String jobId);
-    
-    /**
-     * List all data consistency check algorithms from SPI.
-     *
-     * @return data consistency check algorithms
-     */
-    Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms();
-    
-    /**
-     * Do data consistency check.
-     *
-     * @param jobId job id
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-    
-    /**
-     * Do data consistency check.
-     *
-     * @param jobId job id
-     * @param algorithmType algorithm type
-     * @param algorithmProps algorithm props. Nullable
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps);
-    
     /**
      * Add migration source resources.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
similarity index 56%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
index b2e11b30c15..92420d82c63 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
@@ -15,21 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.util.Map;
 
 /**
- * Migration process context.
+ * Pipeline data consistency checker.
  */
-@Getter
-@Slf4j
-public final class MigrationProcessContext extends AbstractInventoryIncrementalProcessContext {
+public interface PipelineDataConsistencyChecker {
     
-    public MigrationProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
-        super(jobId, originalProcessConfig);
-    }
+    /**
+     * Data consistency check.
+     *
+     * @param calculateAlgorithm calculate algorithm
+     * @return check results. key is logic table name, value is check result.
+     */
+    Map<String, DataConsistencyCheckResult> check(DataConsistencyCalculateAlgorithm calculateAlgorithm);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
similarity index 60%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 982a99f2e9e..fdf355a8a4f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -15,42 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.core.api;
 
-import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
 import java.util.Map;
 
 /**
- * Migration job API.
+ * Inventory incremental job API.
  */
-@SingletonSPI
-public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI, RequiredSPI {
-    
-    @Override
-    MigrationJobConfiguration getJobConfiguration(String jobId);
-    
-    @Override
-    MigrationTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
-    
-    @Override
-    MigrationProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
     
     /**
      * Get job progress.
      *
-     * @param jobConfig job configuration
+     * @param pipelineJobConfig job configuration
      * @return each sharding item progress
      */
-    Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(MigrationJobConfiguration jobConfig);
+    Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(PipelineJobConfiguration pipelineJobConfig);
     
     @Override
     InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
@@ -58,10 +42,10 @@ public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI,
     /**
      * Do data consistency check.
      *
-     * @param jobConfig job configuration
+     * @param pipelineJobConfig job configuration
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(MigrationJobConfiguration jobConfig);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig);
     
     /**
      * Aggregate data consistency check results.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
new file mode 100644
index 00000000000..d0be204a7d0
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Abstract inventory incremental job API implementation.
+ */
+@Slf4j
+public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI, InventoryIncrementalJobPublicAPI {
+    
+    private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+    
+    private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
+    
+    private final InventoryIncrementalJobItemAPIImpl jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
+    
+    protected abstract String getTargetDatabaseType(PipelineJobConfiguration pipelineJobConfig);
+    
+    @Override
+    public abstract InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+    
+    @Override
+    public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+        ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
+        processConfigPersistService.persist(getJobType(), processConfig);
+    }
+    
+    @Override
+    public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+        // TODO check rateLimiter type match or not
+        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+        targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+    }
+    
+    private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
+        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+        ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
+        return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+    }
+    
+    @Override
+    public void dropProcessConfiguration(final String confPath) {
+        String finalConfPath = confPath.trim();
+        PipelineProcessConfigurationUtil.verifyConfPath(confPath);
+        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+        PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
+        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+    }
+    
+    @Override
+    public PipelineProcessConfiguration showProcessConfiguration() {
+        PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
+        result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
+        return result;
+    }
+    
+    @Override
+    public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final String jobId) {
+        checkModeConfig();
+        return getJobProgress(getJobConfiguration(jobId));
+    }
+    
+    @Override
+    public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final PipelineJobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobId();
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
+            InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
+            if (null != jobItemProgress) {
+                jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
+                jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId, each));
+            }
+            map.put(each, jobItemProgress);
+        }, LinkedHashMap::putAll);
+    }
+    
+    @Override
+    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+        return jobItemAPI.getJobItemProgress(jobId, shardingItem);
+    }
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+        jobItemAPI.persistJobItemProgress(jobItemContext);
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
+        jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
+    }
+    
+    @Override
+    public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
+        checkModeConfig();
+        return DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(each -> {
+            DataConsistencyCheckAlgorithmInfo result = new DataConsistencyCheckAlgorithmInfo();
+            result.setType(each.getType());
+            result.setDescription(each.getDescription());
+            result.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
+            return result;
+        }).collect(Collectors.toList());
+    }
+    
+    @Override
+    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
+        checkModeConfig();
+        log.info("Data consistency check for job {}", jobId);
+        PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
+        return dataConsistencyCheck(jobConfig);
+    }
+    
+    @Override
+    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig) {
+        DataConsistencyCalculateAlgorithm algorithm = DataConsistencyCalculateAlgorithmChooser.choose(
+                DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
+        return dataConsistencyCheck(jobConfig, algorithm);
+    }
+    
+    @Override
+    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType, final Properties algorithmProps) {
+        checkModeConfig();
+        log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
+        PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
+        return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
+    }
+    
+    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+        String jobId = jobConfig.getJobId();
+        Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
+        log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
+        return result;
+    }
+    
+    protected abstract PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext);
+    
+    @Override
+    public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
+        if (checkResults.isEmpty()) {
+            return false;
+        }
+        for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
+            DataConsistencyCheckResult checkResult = entry.getValue();
+            boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
+            boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
+            if (!isCountMatched || !isContentMatched) {
+                log.error("job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
deleted file mode 100644
index 44f48883545..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.api.impl;
-
-import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-
-/**
- * Inventory incremental job API implementation.
- */
-public abstract class InventoryIncrementalJobPublicAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobPublicAPI {
-    
-    private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
-    
-    private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
-    
-    @Override
-    public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
-        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
-        ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
-        processConfigPersistService.persist(getJobType(), processConfig);
-    }
-    
-    @Override
-    public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
-        // TODO check rateLimiter type match or not
-        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
-        targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
-        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
-    }
-    
-    private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
-        PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
-        ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
-        return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
-    }
-    
-    @Override
-    public void dropProcessConfiguration(final String confPath) {
-        String finalConfPath = confPath.trim();
-        PipelineProcessConfigurationUtil.verifyConfPath(confPath);
-        YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
-        PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
-        processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
-    }
-    
-    @Override
-    public PipelineProcessConfiguration showProcessConfiguration() {
-        PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
-        result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
-        return result;
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 1333121ec89..8f5b30e7eef 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -52,8 +52,11 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
                     log.info("{} executing jobs", jobId);
-                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor())
-                            .whenComplete((unused, throwable) -> log.error("execute failed, jobId={}", jobId, throwable));
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+                        if (null != throwable) {
+                            log.error("execute failed, jobId={}", jobId, throwable);
+                        }
+                    });
                 }
                 break;
             case DELETED:
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index a196b56e552..a0fd5c4ae5a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -30,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -69,7 +71,7 @@ import java.util.concurrent.TimeUnit;
  * Data consistency checker for migration job.
  */
 @Slf4j
-public final class MigrationDataConsistencyChecker {
+public final class MigrationDataConsistencyChecker implements PipelineDataConsistencyChecker {
     
     private final MigrationJobConfiguration jobConfig;
     
@@ -81,21 +83,16 @@ public final class MigrationDataConsistencyChecker {
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
-    public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
+    public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
         this.jobConfig = jobConfig;
         sourceTableName = jobConfig.getSourceTableName();
         targetTableName = jobConfig.getTargetTableName();
         tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
                 new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
-        this.readRateLimitAlgorithm = readRateLimitAlgorithm;
+        this.readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
     }
     
-    /**
-     * Check data consistency.
-     *
-     * @param calculator data consistency calculate algorithm
-     * @return checked result. key is logic table name, value is check result.
-     */
+    @Override
     public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
         Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
         Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
@@ -113,6 +110,7 @@ public final class MigrationDataConsistencyChecker {
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
+        // TODO migration might support multiple tables
         Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(1, 1);
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 982a99f2e9e..efa627cb5e4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -18,22 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
-import java.util.Map;
-
 /**
  * Migration job API.
  */
 @SingletonSPI
-public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI, RequiredSPI {
+public interface MigrationJobAPI extends InventoryIncrementalJobAPI, MigrationJobPublicAPI, RequiredSPI {
     
     @Override
     MigrationJobConfiguration getJobConfiguration(String jobId);
@@ -43,32 +39,4 @@ public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI,
     
     @Override
     MigrationProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
-    
-    /**
-     * Get job progress.
-     *
-     * @param jobConfig job configuration
-     * @return each sharding item progress
-     */
-    Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(MigrationJobConfiguration jobConfig);
-    
-    @Override
-    InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
-    
-    /**
-     * Do data consistency check.
-     *
-     * @param jobConfig job configuration
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(MigrationJobConfiguration jobConfig);
-    
-    /**
-     * Aggregate data consistency check results.
-     *
-     * @param jobId job id
-     * @param checkResults check results
-     * @return check success or not
-     */
-    boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResults);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index b4419dafa05..027c13f1a40 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -22,7 +22,7 @@ import com.google.common.base.Strings;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
@@ -34,7 +34,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -43,10 +42,8 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
@@ -55,15 +52,10 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobPublicAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigrationSourceResourceException;
@@ -72,8 +64,6 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSche
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
-import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
@@ -107,23 +97,19 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * Migration job API impl.
  */
 @Slf4j
-public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPIImpl implements MigrationJobAPI {
+public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAPIImpl implements MigrationJobAPI {
     
     private static final YamlRuleConfigurationSwapperEngine RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
     
     private static final YamlDataSourceConfigurationSwapper DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
     
-    private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
-    
     private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
     
     @Override
@@ -203,6 +189,11 @@ public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPII
         return YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
     }
     
+    @Override
+    protected String getTargetDatabaseType(final PipelineJobConfiguration pipelineJobConfig) {
+        return ((MigrationJobConfiguration) pipelineJobConfig).getTargetDatabaseType();
+    }
+    
     @Override
     public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
@@ -265,99 +256,8 @@ public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPII
     }
     
     @Override
-    public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final String jobId) {
-        checkModeConfig();
-        return getJobProgress(getJobConfiguration(jobId));
-    }
-    
-    @Override
-    public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final MigrationJobConfiguration jobConfig) {
-        String jobId = jobConfig.getJobId();
-        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
-            InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
-            if (null != jobItemProgress) {
-                jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
-                jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId, each));
-            }
-            map.put(each, jobItemProgress);
-        }, LinkedHashMap::putAll);
-    }
-    
-    @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
-        return (InventoryIncrementalJobItemProgress) jobItemAPI.getJobItemProgress(jobId, shardingItem);
-    }
-    
-    @Override
-    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
-        jobItemAPI.persistJobItemProgress(jobItemContext);
-    }
-    
-    @Override
-    public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
-        jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
-    }
-    
-    @Override
-    public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
-        checkModeConfig();
-        return DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(each -> {
-            DataConsistencyCheckAlgorithmInfo result = new DataConsistencyCheckAlgorithmInfo();
-            result.setType(each.getType());
-            result.setDescription(each.getDescription());
-            result.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
-            return result;
-        }).collect(Collectors.toList());
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
-        checkModeConfig();
-        log.info("Data consistency check for job {}", jobId);
-        MigrationJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig);
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final MigrationJobConfiguration jobConfig) {
-        DataConsistencyCalculateAlgorithm algorithm = DataConsistencyCalculateAlgorithmChooser.choose(
-                DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()));
-        return dataConsistencyCheck(jobConfig, algorithm);
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType, final Properties algorithmProps) {
-        checkModeConfig();
-        log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
-        MigrationJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
-        return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
-    }
-    
-    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
-        String jobId = jobConfig.getJobId();
-        JobRateLimitAlgorithm readRateLimitAlgorithm = buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
-        Map<String, DataConsistencyCheckResult> result = new MigrationDataConsistencyChecker(jobConfig, readRateLimitAlgorithm).check(calculator);
-        log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getType(), result);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
-        return result;
-    }
-    
-    @Override
-    public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
-        if (checkResults.isEmpty()) {
-            return false;
-        }
-        for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
-            DataConsistencyCheckResult checkResult = entry.getValue();
-            boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
-            boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
-            if (!isCountMatched || !isContentMatched) {
-                log.error("job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
-                return false;
-            }
-        }
-        return true;
+    protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
+        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index b2e11b30c15..1122a40ecc0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -29,6 +29,12 @@ import org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryInc
 @Slf4j
 public final class MigrationProcessContext extends AbstractInventoryIncrementalProcessContext {
     
+    /**
+     * Constructor.
+     *
+     * @param jobId job id
+     * @param originalProcessConfig original process configuration, nullable
+     */
     public MigrationProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
         super(jobId, originalProcessConfig);
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index bf492b724a2..fa646106a05 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -45,7 +45,9 @@ public final class MigrationDataConsistencyCheckerTest {
     
     @Test
     public void assertCountAndDataCheck() throws SQLException {
-        Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(createJobConfiguration(), null).check(new DataConsistencyCalculateAlgorithmFixture());
+        MigrationJobConfiguration jobConfig = createJobConfiguration();
+        Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null))
+                .check(new DataConsistencyCalculateAlgorithmFixture());
         assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
         assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
         assertTrue(actual.get("t_order").getContentCheckResult().isMatched());