You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/30 03:28:35 UTC

[shardingsphere] branch master updated: Revise consistency check job code; Migration job drop cascade check jobs (#21273)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bc2d9b63428 Revise consistency check job code; Migration job drop cascade check jobs (#21273)
bc2d9b63428 is described below

commit bc2d9b634288205b7602fbe00d66161c8a585ffe
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Sep 30 11:28:25 2022 +0800

    Revise consistency check job code; Migration job drop cascade check jobs (#21273)
    
    * Add log
    
    * Revise consistency check job code
    
    * Revise consistency check job code
    
    * Migration job drop cascade check jobs
    
    * Update sequence range
    
    * Unit test
---
 .../pipeline/api/ConsistencyCheckJobPublicAPI.java |  2 +-
 .../job/ConsistencyCheckJobConfiguration.java      |  2 +-
 .../yaml/YamlConsistencyCheckJobConfiguration.java |  2 +-
 ...amlConsistencyCheckJobConfigurationSwapper.java |  4 +--
 .../job/progress/ConsistencyCheckJobProgress.java  |  4 ++-
 .../dumper/IncrementalDumperCreatorFactory.java    | 14 +++++++-
 .../pipeline/core/api/GovernanceRepositoryAPI.java | 16 ++++++---
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 32 ++++++++++-------
 ...mpletedConsistencyCheckJobExistsException.java} |  8 ++---
 .../yaml/YamlConsistencyCheckJobProgress.java      |  2 +-
 .../metadata/generator/PipelineDDLGenerator.java   |  4 ++-
 .../core/metadata/node/PipelineMetaDataNode.java   |  4 +--
 .../core/prepare/InventoryTaskSplitter.java        |  2 ++
 .../core/prepare/PipelineJobPreparerUtils.java     | 24 ++++++++-----
 .../data/pipeline/core/task/IncrementalTask.java   |  2 +-
 ...tencyCheckChangedJobConfigurationProcessor.java |  2 +-
 .../consistencycheck/ConsistencyCheckJob.java      |  3 +-
 .../consistencycheck/ConsistencyCheckJobAPI.java   |  1 -
 .../ConsistencyCheckJobAPIImpl.java                | 41 ++++++++++++----------
 .../consistencycheck/ConsistencyCheckJobId.java    | 20 ++++++-----
 .../ConsistencyCheckJobItemContext.java            |  2 +-
 .../pipeline/scenario/migration/MigrationJob.java  |  2 ++
 .../scenario/migration/MigrationJobAPIImpl.java    | 19 ++++++++++
 .../scenario/migration/MigrationJobPreparer.java   | 11 +++---
 .../api/impl/ConsistencyCheckJobAPIImplTest.java   | 15 ++++----
 25 files changed, 152 insertions(+), 86 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 5e226ce1124..008f0002705 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -31,7 +31,7 @@ import java.util.Map;
 public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI {
     
     /**
-     * Create job migration config and start.
+     * Create consistency check configuration and start job.
      *
      * @param parameter create consistency check job parameter
      * @return job id
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
index 8576a374cae..b80eab65ee1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
@@ -39,7 +39,7 @@ public final class ConsistencyCheckJobConfiguration implements PipelineJobConfig
     
     private final String algorithmTypeName;
     
-    private final Properties algorithmProperties;
+    private final Properties algorithmProps;
     
     @Override
     public String getSourceDatabaseType() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
index 53ec03e9c42..69ace660ef1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
@@ -39,7 +39,7 @@ public final class YamlConsistencyCheckJobConfiguration implements YamlPipelineJ
     
     private String algorithmTypeName;
     
-    private Properties algorithmProperties;
+    private Properties algorithmProps;
     
     @Override
     public String getTargetDatabaseName() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
index 64ce12e1029..f945d8d0804 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -34,13 +34,13 @@ public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlCo
         result.setJobId(data.getJobId());
         result.setParentJobId(data.getParentJobId());
         result.setAlgorithmTypeName(data.getAlgorithmTypeName());
-        result.setAlgorithmProperties(data.getAlgorithmProperties());
+        result.setAlgorithmProps(data.getAlgorithmProps());
         return result;
     }
     
     @Override
     public ConsistencyCheckJobConfiguration swapToObject(final YamlConsistencyCheckJobConfiguration yamlConfig) {
-        return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProperties());
+        return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProps());
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index dbb318489b4..22a16efa840 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -19,13 +19,15 @@ package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 
 /**
- * Data check job item progress.
+ * Data consistency check job progress.
  */
 @Getter
 @Setter
+@ToString
 public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
     
     private JobStatus status = JobStatus.RUNNING;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
index f81e9fbcd61..1f39e23fd02 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
@@ -22,6 +22,8 @@ import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
+import java.util.Optional;
+
 /**
  * Incremental dumper creator factory.
  */
@@ -33,7 +35,7 @@ public class IncrementalDumperCreatorFactory {
     }
     
     /**
-     * Incremental dumper creator.
+     * Get incremental dumper creator.
      *
      * @param databaseType database type
      * @return incremental dumper creator
@@ -41,4 +43,14 @@ public class IncrementalDumperCreatorFactory {
     public static IncrementalDumperCreator getInstance(final String databaseType) {
         return TypedSPIRegistry.getRegisteredService(IncrementalDumperCreator.class, databaseType);
     }
+    
+    /**
+     * Find incremental dumper creator.
+     *
+     * @param databaseType database type
+     * @return incremental dumper creator optional
+     */
+    public static Optional<IncrementalDumperCreator> findInstance(final String databaseType) {
+        return TypedSPIRegistry.findRegisteredService(IncrementalDumperCreator.class, databaseType);
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index edbc645a141..e7dc53f1050 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -66,7 +66,7 @@ public interface GovernanceRepositoryAPI {
     Optional<String> getCheckLatestJobId(String jobId);
     
     /**
-     * Persist check latest result.
+     * Persist check latest job id.
      *
      * @param jobId job id
      * @param checkJobId check job id
@@ -83,13 +83,21 @@ public interface GovernanceRepositoryAPI {
     Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId, String checkJobId);
     
     /**
-     * Persist check latest detailed result.
+     * Persist check job result.
      *
      * @param jobId job id
      * @param checkJobId check job id
-     * @param dataConsistencyCheckResult check result
+     * @param checkResultMap check result map
      */
-    void persistCheckJobResult(String jobId, String checkJobId, Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult);
+    void persistCheckJobResult(String jobId, String checkJobId, Map<String, DataConsistencyCheckResult> checkResultMap);
+    
+    /**
+     * Delete check job result.
+     *
+     * @param jobId job id
+     * @param checkJobId check job id
+     */
+    void deleteCheckJobResult(String jobId, String checkJobId);
     
     /**
      * List check job ids.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 3baf3db9f5f..329915e7485 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -79,30 +79,36 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     @Override
     public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String jobId, final String checkJobId) {
         Map<String, DataConsistencyCheckResult> result = new HashMap<>();
-        String checkJobText = repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
-        if (StringUtils.isBlank(checkJobText)) {
+        String yamlCheckResultMapText = repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+        if (StringUtils.isBlank(yamlCheckResultMapText)) {
             return Collections.emptyMap();
         }
-        Map<String, String> checkJobConfigMap = YamlEngine.unmarshal(checkJobText, Map.class, true);
-        for (Entry<String, String> entry : checkJobConfigMap.entrySet()) {
+        Map<String, String> yamlCheckResultMap = YamlEngine.unmarshal(yamlCheckResultMapText, Map.class, true);
+        for (Entry<String, String> entry : yamlCheckResultMap.entrySet()) {
             result.put(entry.getKey(), YamlDataConsistencyCheckResultSwapper.swapToObject(entry.getValue()));
         }
         return result;
     }
     
     @Override
-    public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult) {
-        if (null == dataConsistencyCheckResult) {
-            log.warn("data consistency check is null, jobId {}, checkJobId {}", jobId, checkJobId);
+    public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> checkResultMap) {
+        if (null == checkResultMap) {
+            log.warn("checkResultMap is null, jobId {}, checkJobId {}", jobId, checkJobId);
             return;
         }
-        log.info("persist check job result '{}' for job {}", checkJobId, jobId);
-        Map<String, String> checkResultMap = new LinkedHashMap<>();
-        for (Entry<String, DataConsistencyCheckResult> entry : dataConsistencyCheckResult.entrySet()) {
-            YamlDataConsistencyCheckResult checkResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
-            checkResultMap.put(entry.getKey(), YamlEngine.marshal(checkResult));
+        log.info("persist check job result for job {}", checkJobId);
+        Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
+        for (Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
+            YamlDataConsistencyCheckResult yamlCheckResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
+            yamlCheckResultMap.put(entry.getKey(), YamlEngine.marshal(yamlCheckResult));
         }
-        repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId), YamlEngine.marshal(checkResultMap));
+        repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId), YamlEngine.marshal(yamlCheckResultMap));
+    }
+    
+    @Override
+    public void deleteCheckJobResult(final String jobId, final String checkJobId) {
+        log.info("deleteCheckJobResult, jobId={}, checkJobId={}", jobId, checkJobId);
+        repository.delete(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
similarity index 77%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
index dff0e6a1905..e5ab9d1c8db 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
@@ -21,13 +21,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Pipeline job has already existed exception.
+ * Uncompleted consistency check job exists exception.
  */
-public final class PipelineJobHasAlreadyExistedException extends PipelineSQLException {
+public final class UncompletedConsistencyCheckJobExistsException extends PipelineSQLException {
     
     private static final long serialVersionUID = 2854259384634892428L;
     
-    public PipelineJobHasAlreadyExistedException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 86, "Job `%s` has already existed", jobId);
+    public UncompletedConsistencyCheckJobExistsException(final String jobId) {
+        super(XOpenSQLState.GENERAL_ERROR, 86, "Uncompelted consistency check job `%s` exists", jobId);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index 05dd63bb70c..f0baaf647c8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -21,7 +21,7 @@ import lombok.Data;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
- * Yaml data check job progress.
+ * Yaml data consistency check job progress.
  */
 @Data
 public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index 1df7d29a532..a2785126509 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -72,12 +72,14 @@ public final class PipelineDDLGenerator {
      */
     public String generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
                                    final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
-        log.info("generateLogicDDLSQL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
+        log.info("generateLogicDDL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
+        long startTimeMillis = System.currentTimeMillis();
         StringBuilder result = new StringBuilder();
         for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
             Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
             queryContext.ifPresent(ddlSQL -> result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
         }
+        log.info("generateLogicDDL cost {} ms", System.currentTimeMillis() - startTimeMillis);
         return result.toString();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 81a35c9f7e5..190bf24e353 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -118,7 +118,7 @@ public final class PipelineMetaDataNode {
     }
     
     /**
-     * Get check latest detailed result path.
+     * Get check latest job id path.
      *
      * @param jobId job id
      * @return check latest job id path
@@ -128,7 +128,7 @@ public final class PipelineMetaDataNode {
     }
     
     /**
-     * Get check latest result path.
+     * Get check job result path.
      *
      * @param jobId job id
      * @param checkJobId check job id
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index fd8354ab791..f14e56d3e71 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -73,11 +73,13 @@ public final class InventoryTaskSplitter {
      */
     public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemContext jobItemContext) {
         List<InventoryTask> result = new LinkedList<>();
+        long startTimeMillis = System.currentTimeMillis();
         PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
         for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
             result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, jobItemContext.getDataSourceManager(), sourceDataSource, jobItemContext.getSourceMetaDataLoader(),
                     jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(), jobItemContext.getJobProcessContext().getInventoryImporterExecuteEngine(), jobItemContext));
         }
+        log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis);
         return result;
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index d1ea79d9981..c58c67eaad7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourc
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializerFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -47,11 +48,8 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * Pipeline job preparer utils.
@@ -59,8 +57,6 @@ import java.util.Set;
 @Slf4j
 public final class PipelineJobPreparerUtils {
     
-    private static final Set<String> INCREMENTAL_SUPPORTED_DATABASES = new HashSet<>(Arrays.asList("MySQL", "PostgreSQL", "openGauss"));
-    
     /**
      * Is incremental supported.
      *
@@ -68,8 +64,7 @@ public final class PipelineJobPreparerUtils {
      * @return true if supported, otherwise false
      */
     public static boolean isIncrementalSupported(final String databaseType) {
-        // TODO check by IncrementalDumperCreator SPI
-        return INCREMENTAL_SUPPORTED_DATABASES.contains(databaseType);
+        return IncrementalDumperCreatorFactory.findInstance(databaseType).isPresent();
     }
     
     /**
@@ -85,7 +80,9 @@ public final class PipelineJobPreparerUtils {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
+        long startTimeMillis = System.currentTimeMillis();
         dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+        log.info("prepareTargetSchema cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     /**
@@ -113,7 +110,9 @@ public final class PipelineJobPreparerUtils {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
+        long startTimeMillis = System.currentTimeMillis();
         dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
+        log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     /**
@@ -135,7 +134,10 @@ public final class PipelineJobPreparerUtils {
         }
         String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
         DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        return PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
+        long startTimeMillis = System.currentTimeMillis();
+        IngestPosition<?> result = PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
+        log.info("getIncrementalPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
+        return result;
     }
     
     /**
@@ -149,10 +151,12 @@ public final class PipelineJobPreparerUtils {
             log.info("source data source is empty, skip check");
             return;
         }
+        final long startTimeMillis = System.currentTimeMillis();
         DataSourceChecker dataSourceChecker = DataSourceCheckerFactory.getInstance(databaseType);
         dataSourceChecker.checkConnection(dataSources);
         dataSourceChecker.checkPrivilege(dataSources);
         dataSourceChecker.checkVariable(dataSources);
+        log.info("checkSourceDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     /**
@@ -168,8 +172,10 @@ public final class PipelineJobPreparerUtils {
             log.info("target data source is empty, skip check");
             return;
         }
+        long startTimeMillis = System.currentTimeMillis();
         dataSourceChecker.checkConnection(targetDataSources);
         dataSourceChecker.checkTargetTable(targetDataSources, importerConfig.getTableNameSchemaNameMapping(), importerConfig.getLogicTableNames());
+        log.info("checkTargetDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     /**
@@ -182,6 +188,7 @@ public final class PipelineJobPreparerUtils {
     public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
         DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
         PositionInitializer positionInitializer = PositionInitializerFactory.getInstance(databaseType.getType());
+        final long startTimeMillis = System.currentTimeMillis();
         log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
         if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
             ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
@@ -199,5 +206,6 @@ public final class PipelineJobPreparerUtils {
                 positionInitializer.destroy(dataSource, jobId);
             }
         }
+        log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 03889561674..9e7363970a0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -46,7 +46,7 @@ import java.util.concurrent.CompletableFuture;
  * Incremental task.
  */
 @Slf4j
-@ToString(exclude = {"incrementalDumperExecuteEngine", "importerExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
+@ToString(exclude = {"incrementalExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
 public final class IncrementalTask implements PipelineTask, AutoCloseable {
     
     @Getter
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index ea606b8e296..8d71eae5a7c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Consistency check job configuration changed processor.
+ * Consistency check changed job configuration processor.
  */
 @Slf4j
 public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 05dc69d7467..e00a4e6359c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -59,14 +59,13 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
         jobAPI.persistJobItemProgress(jobItemContext);
         String parentJobId = consistencyCheckJobConfig.getParentJobId();
         log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, parentJobId);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(parentJobId, checkJobId);
         JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
         InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
         Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = Collections.emptyMap();
         try {
             dataConsistencyCheckResult = StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())
                     ? jobPublicAPI.dataConsistencyCheck(parentJobId)
-                    : jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+                    : jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProps());
             status = JobStatus.FINISHED;
         } catch (final SQLWrapperException ex) {
             log.error("data consistency check failed", ex);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
index dca95957b5c..d8495582568 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
@@ -25,5 +25,4 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
  * Consistency check job API.
  */
 public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI, PipelineJobItemAPI {
-    
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 4ad73f6ee91..c2e88634d30 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -39,9 +39,9 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -63,28 +63,31 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
         ConsistencyCheckJobId jobId = (ConsistencyCheckJobId) pipelineJobId;
-        return jobId.getPipelineJobId() + jobId.getSequence();
+        return jobId.getParentJobId() + jobId.getSequence();
     }
     
     @Override
     public String createJobAndStart(final CreateConsistencyCheckJobParameter parameter) {
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        Optional<String> optional = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
-        if (optional.isPresent()) {
-            PipelineJobItemProgress progress = getJobItemProgress(optional.get(), 0);
-            if (null != progress && JobStatus.FINISHED != progress.getStatus()) {
-                log.info("check job already existed and status isn't FINISHED, status {}", progress.getStatus());
-                throw new PipelineJobHasAlreadyExistedException(optional.get());
+        String parentJobId = parameter.getJobId();
+        Optional<String> checkLatestJobId = repositoryAPI.getCheckLatestJobId(parentJobId);
+        if (checkLatestJobId.isPresent()) {
+            PipelineJobItemProgress progress = getJobItemProgress(checkLatestJobId.get(), 0);
+            if (null == progress || JobStatus.FINISHED != progress.getStatus()) {
+                log.info("check job already exists and status is not FINISHED, progress={}", progress);
+                throw new UncompletedConsistencyCheckJobExistsException(checkLatestJobId.get());
             }
         }
-        int consistencyCheckVersionNew = optional.map(s -> ConsistencyCheckJobId.getSequence(s) + 1).orElse(0);
+        int sequence = checkLatestJobId.map(s -> ConsistencyCheckJobId.parseSequence(s) + 1).orElse(ConsistencyCheckJobId.MIN_SEQUENCE);
+        String result = marshalJobId(new ConsistencyCheckJobId(parentJobId, sequence));
+        repositoryAPI.persistCheckLatestJobId(parentJobId, result);
+        repositoryAPI.deleteCheckJobResult(parentJobId, result);
+        dropJob(result);
         YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
-        ConsistencyCheckJobId checkJobId = new ConsistencyCheckJobId(parameter.getJobId(), consistencyCheckVersionNew);
-        String result = marshalJobId(checkJobId);
         yamlConfig.setJobId(result);
-        yamlConfig.setParentJobId(parameter.getJobId());
+        yamlConfig.setParentJobId(parentJobId);
         yamlConfig.setAlgorithmTypeName(parameter.getAlgorithmTypeName());
-        yamlConfig.setAlgorithmProperties(parameter.getAlgorithmProps());
+        yamlConfig.setAlgorithmProps(parameter.getAlgorithmProps());
         ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig);
         start(jobConfig);
         return result;
@@ -108,7 +111,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     }
     
     @Override
-    public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+    public ConsistencyCheckJobProgress getJobItemProgress(final String jobId, final int shardingItem) {
         String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
         if (StringUtils.isBlank(progress)) {
             return null;
@@ -121,7 +124,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     @Override
     public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
-        ConsistencyCheckJobProgress jobProgress = (ConsistencyCheckJobProgress) getJobItemProgress(jobId, shardingItem);
+        ConsistencyCheckJobProgress jobProgress = getJobItemProgress(jobId, shardingItem);
         if (null == jobProgress) {
             log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
             return;
@@ -135,6 +138,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
         log.info("Start disable check job {}", jobId);
         PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
         ShardingSpherePreconditions.checkState(null == jobProgress || JobStatus.FINISHED != jobProgress.getStatus(), () -> new PipelineJobHasAlreadyFinishedException(jobId));
+        super.startDisabledJob(jobId);
     }
     
     @Override
@@ -170,21 +174,22 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     @Override
     public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+        throw new UnsupportedOperationException();
     }
     
     @Override
     public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
-        return null;
+        throw new UnsupportedOperationException();
     }
     
     @Override
     public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
-        return null;
+        throw new UnsupportedOperationException();
     }
     
     @Override
     protected PipelineJobInfo getJobInfo(final String jobId) {
-        return null;
+        throw new UnsupportedOperationException();
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 9e8d3b2b87b..727d83dedb6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -32,26 +32,28 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
+    public static final int MIN_SEQUENCE = 1;
     
-    private final String pipelineJobId;
+    private static final int MAX_SEQUENCE = 3;
+    
+    private final String parentJobId;
     
     private final int sequence;
     
-    public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final int sequence) {
+    public ConsistencyCheckJobId(final @NonNull String parentJobId, final int sequence) {
         super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
-        this.pipelineJobId = pipelineJobId;
-        this.sequence = sequence > MAX_CONSISTENCY_CHECK_VERSION ? 0 : sequence;
+        this.parentJobId = parentJobId;
+        this.sequence = sequence > MAX_SEQUENCE ? MIN_SEQUENCE : sequence;
     }
     
     /**
-     * Get consistency check version.
+     * Parse consistency check sequence.
      *
-     * @param consistencyCheckJobId consistency check job id.
+     * @param checkJobId consistency check job id
      * @return sequence
      */
-    public static int getSequence(final @NonNull String consistencyCheckJobId) {
-        String versionString = consistencyCheckJobId.substring(consistencyCheckJobId.length() - 1);
+    public static int parseSequence(final @NonNull String checkJobId) {
+        String versionString = checkJobId.substring(checkJobId.length() - 1);
         return Integer.parseInt(versionString);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index b3471ac39b4..fb5841d9fde 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -54,6 +54,6 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
     
     @Override
     public PipelineProcessContext getJobProcessContext() {
-        throw new UnsupportedOperationException("");
+        throw new UnsupportedOperationException();
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 58ba577aa8d..67fd3c59616 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -88,7 +88,9 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
     
     private void prepare(final MigrationJobItemContext jobItemContext) {
         try {
+            long startTimeMillis = System.currentTimeMillis();
             jobPreparer.prepare(jobItemContext);
+            log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
             // CHECKSTYLE:OFF
         } catch (final SQLException | RuntimeException ex) {
             // CHECKSTYLE:ON
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 9d3d1f91e2e..e88940af179 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -255,9 +255,25 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     @Override
     public void rollback(final String jobId) throws SQLException {
         log.info("Rollback job {}", jobId);
+        final long startTimeMillis = System.currentTimeMillis();
+        dropCheckJobs(jobId);
         stop(jobId);
         cleanTempTableOnRollback(jobId);
         dropJob(jobId);
+        log.info("Rollback cost {} ms", System.currentTimeMillis() - startTimeMillis);
+    }
+    
+    private void dropCheckJobs(final String jobId) {
+        Collection<String> checkJobIds = PipelineAPIFactory.getGovernanceRepositoryAPI().listCheckJobIds(jobId);
+        if (checkJobIds.isEmpty()) {
+            return;
+        }
+        log.info("dropCheckJobs start...");
+        long startTimeMillis = System.currentTimeMillis();
+        for (String each : checkJobIds) {
+            dropJob(each);
+        }
+        log.info("dropCheckJobs cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws SQLException {
@@ -281,8 +297,11 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     public void commit(final String jobId) {
         checkModeConfig();
         log.info("Commit job {}", jobId);
+        final long startTimeMillis = System.currentTimeMillis();
+        dropCheckJobs(jobId);
         stop(jobId);
         dropJob(jobId);
+        log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index bf698ad74e0..d7c397c7f4d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -74,7 +74,6 @@ public final class MigrationJobPreparer {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
-        // TODO check metadata
         if (PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType())) {
             initIncrementalTasks(jobItemContext);
             if (jobItemContext.isStopping()) {
@@ -84,6 +83,8 @@ public final class MigrationJobPreparer {
             }
         }
         initInventoryTasks(jobItemContext);
+        log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
+                jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
     }
     
     private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
@@ -94,8 +95,9 @@ public final class MigrationJobPreparer {
         if (null == JOB_API.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem())) {
             JOB_API.persistJobItemProgress(jobItemContext);
         }
+        long startTimeMillis = System.currentTimeMillis();
         if (lockContext.tryLock(lockDefinition, 180000)) {
-            log.info("try lock success, jobId={}, shardingItem={}", jobConfig.getJobId(), jobItemContext.getShardingItem());
+            log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
             try {
                 InventoryIncrementalJobItemProgress jobItemProgress = JOB_API.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
                 boolean prepareFlag = JobStatus.PREPARING.equals(jobItemProgress.getStatus()) || JobStatus.RUNNING.equals(jobItemProgress.getStatus())
@@ -111,7 +113,7 @@ public final class MigrationJobPreparer {
                     }
                 }
             } finally {
-                log.info("unlock, jobId={}, shardingItem={}", jobConfig.getJobId(), jobItemContext.getShardingItem());
+                log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
                 lockContext.unlock(lockDefinition);
             }
         }
@@ -143,15 +145,12 @@ public final class MigrationJobPreparer {
     }
     
     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
-        log.info("initInventoryTasks, start...");
         InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
         PipelineColumnMetaData uniqueKeyColumn = jobItemContext.getJobConfig().getUniqueKeyColumn();
         inventoryDumperConfig.setUniqueKey(uniqueKeyColumn.getName());
         inventoryDumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
         InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig());
         jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
-        log.info("initInventoryTasks, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
-                jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
     }
     
     private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
index 5545cc94dfe..0d0a25a722f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -61,11 +61,12 @@ public final class ConsistencyCheckJobAPIImplTest {
         CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(migrationJobId, null, null);
         String checkJobId = checkJobAPI.createJobAndStart(parameter);
         ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
-        String expectCheckJobId = "j0201j0101test0";
+        int expectedSequence = ConsistencyCheckJobId.MIN_SEQUENCE;
+        String expectCheckJobId = "j0201" + migrationJobId + expectedSequence;
         assertThat(jobConfig.getJobId(), is(expectCheckJobId));
         assertNull(jobConfig.getAlgorithmTypeName());
-        int consistencyCheckVersion = ConsistencyCheckJobId.getSequence(expectCheckJobId);
-        assertThat(consistencyCheckVersion, is(0));
+        int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
+        assertThat(sequence, is(expectedSequence));
     }
     
     @Test
@@ -75,11 +76,11 @@ public final class ConsistencyCheckJobAPIImplTest {
         CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(jobId.get(), null, null);
         String checkJobId = checkJobAPI.createJobAndStart(parameter);
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(jobId.get(), checkJobId);
-        Map<String, DataConsistencyCheckResult> expectResult = Collections.singletonMap("t_order", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1),
+        Map<String, DataConsistencyCheckResult> expectedCheckResult = Collections.singletonMap("t_order", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1),
                 new DataConsistencyContentCheckResult(true)));
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(), checkJobId, expectResult);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(), checkJobId, expectedCheckResult);
         Map<String, DataConsistencyCheckResult> actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(jobId.get());
-        assertThat(actualCheckResult.size(), is(expectResult.size()));
-        assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectResult.get("t_order").getContentCheckResult().isMatched()));
+        assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
+        assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectedCheckResult.get("t_order").getContentCheckResult().isMatched()));
     }
 }