You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2022/12/01 06:49:02 UTC

[shardingsphere] branch master updated: Split pipeline scenario module and package (#22556)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 57a3bec7b64 Split pipeline scenario module and package (#22556)
57a3bec7b64 is described below

commit 57a3bec7b647c17d97a594ae856ae4233b86551a
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Thu Dec 1 14:48:55 2022 +0800

    Split pipeline scenario module and package (#22556)
    
    * Split scenario module
    
    * Other modules depend on scenario sub-modules
    
    * Decouple ConsistencyCheckJobAPIFactory in scenario-migration module
    
    * Split scenario package
    
    * Clean unused anltr config
    
    * Fix self review
---
 features/sharding/distsql/parser/pom.xml              | 12 ------------
 ...t.handler.PipelineChangedJobConfigurationProcessor | 19 -------------------
 .../scenario/{ => consistencycheck}/pom.xml           | 12 ++++--------
 .../consistencycheck/ConsistencyCheckJob.java         |  2 ++
 .../consistencycheck/ConsistencyCheckJobId.java       |  7 ++-----
 .../consistencycheck/api}/ConsistencyCheckJobAPI.java |  2 +-
 .../api}/ConsistencyCheckJobAPIFactory.java           |  2 +-
 .../api/impl}/ConsistencyCheckJobAPIImpl.java         |  7 ++++++-
 .../context}/ConsistencyCheckJobItemContext.java      |  2 +-
 ...sistencyCheckChangedJobConfigurationProcessor.java |  3 ++-
 .../task}/ConsistencyCheckTasksRunner.java            |  5 ++++-
 .../util}/ConsistencyCheckSequence.java               |  4 ++--
 ...ere.data.pipeline.api.ConsistencyCheckJobPublicAPI |  2 +-
 ...rdingsphere.data.pipeline.core.api.PipelineJobAPI} |  2 +-
 ....handler.PipelineChangedJobConfigurationProcessor} |  3 +--
 ...nario.consistencycheck.api.ConsistencyCheckJobAPI} |  2 +-
 kernel/data-pipeline/scenario/{ => migration}/pom.xml |  7 ++++---
 .../pipeline/scenario/migration/MigrationJob.java     |  6 ++++++
 .../pipeline/scenario/migration/MigrationJobId.java   |  0
 .../scenario/migration/api}/MigrationJobAPI.java      |  4 +++-
 .../migration/api}/MigrationJobAPIFactory.java        |  2 +-
 .../migration/api/impl}/MigrationJobAPIImpl.java      | 13 +++++++++----
 .../consistency}/MigrationDataConsistencyChecker.java |  3 ++-
 .../migration/config}/MigrationTaskConfiguration.java |  2 +-
 .../migration/context}/MigrationJobItemContext.java   |  3 ++-
 .../migration/context}/MigrationProcessContext.java   |  2 +-
 .../MigrationChangedJobConfigurationProcessor.java    |  4 +++-
 .../migration/prepare}/MigrationJobPreparer.java      |  6 +++++-
 ...data.pipeline.api.InventoryIncrementalJobPublicAPI |  2 +-
 ...dingsphere.data.pipeline.api.MigrationJobPublicAPI |  2 +-
 ...rdingsphere.data.pipeline.core.api.PipelineJobAPI} |  2 +-
 ....handler.PipelineChangedJobConfigurationProcessor} |  2 +-
 ...a.pipeline.scenario.migration.api.MigrationJobAPI} |  2 +-
 .../pipeline/core/job/PipelineJobIdUtilsTest.java     |  0
 kernel/data-pipeline/scenario/pom.xml                 | 17 +++++------------
 proxy/backend/pom.xml                                 |  7 ++++++-
 test/pipeline/pom.xml                                 |  7 ++++++-
 .../data/pipeline/api/PipelineAPIFactoryTest.java     |  4 ++--
 .../pipeline/api/PipelineJobPublicAPIFactoryTest.java |  4 ++--
 .../api/impl/GovernanceRepositoryAPIImplTest.java     |  4 ++--
 .../core/api/impl/ConsistencyCheckJobAPIImplTest.java | 12 ++++++------
 .../core/api/impl/MigrationJobAPIImplTest.java        |  6 +++---
 ...eChangedJobConfigurationProcessorFactoryTest.java} | 19 +++++++------------
 .../core/prepare/InventoryTaskSplitterTest.java       |  4 ++--
 .../data/pipeline/core/task/IncrementalTaskTest.java  |  2 +-
 .../data/pipeline/core/task/InventoryTaskTest.java    |  2 +-
 .../pipeline/core/util/JobConfigurationBuilder.java   |  2 +-
 .../data/pipeline/core/util/PipelineContextUtil.java  |  8 ++++----
 .../consistencycheck/ConsistencyCheckJobTest.java     |  1 +
 .../impl}/ConsistencyCheckJobAPIFactoryTest.java      |  3 ++-
 .../{ => util}/ConsistencyCheckSequenceTest.java      |  2 +-
 .../{ => api/impl}/MigrationJobAPIFactoryTest.java    |  3 ++-
 .../MigrationDataConsistencyCheckerTest.java          |  4 +++-
 test/pipeline/src/test/resources/logback-test.xml     |  2 +-
 54 files changed, 129 insertions(+), 132 deletions(-)

diff --git a/features/sharding/distsql/parser/pom.xml b/features/sharding/distsql/parser/pom.xml
index 3b8c238176c..292ba08ebeb 100644
--- a/features/sharding/distsql/parser/pom.xml
+++ b/features/sharding/distsql/parser/pom.xml
@@ -63,18 +63,6 @@
                             <visitor>true</visitor>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>antlr-migration</id>
-                        <goals>
-                            <goal>antlr4</goal>
-                        </goals>
-                        <configuration>
-                            <sourceDirectory>src/main/antlr4/migration/</sourceDirectory>
-                            <libDirectory>src/main/antlr4/imports/migration/</libDirectory>
-                            <listener>false</listener>
-                            <visitor>true</visitor>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
         </plugins>
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
deleted file mode 100644
index 2f4035228af..00000000000
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationChangedJobConfigurationProcessor
-org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckChangedJobConfigurationProcessor
diff --git a/kernel/data-pipeline/scenario/pom.xml b/kernel/data-pipeline/scenario/consistencycheck/pom.xml
similarity index 76%
copy from kernel/data-pipeline/scenario/pom.xml
copy to kernel/data-pipeline/scenario/consistencycheck/pom.xml
index 8f6ae22befd..e8e6124db63 100644
--- a/kernel/data-pipeline/scenario/pom.xml
+++ b/kernel/data-pipeline/scenario/consistencycheck/pom.xml
@@ -16,15 +16,16 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
+        <artifactId>shardingsphere-data-pipeline-scenario</artifactId>
         <groupId>org.apache.shardingsphere</groupId>
-        <artifactId>shardingsphere-data-pipeline</artifactId>
         <version>5.2.2-SNAPSHOT</version>
     </parent>
-    <artifactId>shardingsphere-data-pipeline-scenario</artifactId>
+    <artifactId>shardingsphere-data-pipeline-scenario-consistencycheck</artifactId>
     <name>${project.artifactId}</name>
     
     <dependencies>
@@ -33,10 +34,5 @@
             <artifactId>shardingsphere-data-pipeline-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-sharding-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
     </dependencies>
 </project>
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
similarity index 92%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 3f7810248ae..cf4b61c3d8a 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -24,6 +24,8 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
similarity index 90%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 63d0c44d7d4..5f5158f29b0 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
 
 /**
  * Consistency check job id.
@@ -31,10 +32,6 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    public static final int MIN_SEQUENCE = 1;
-    
-    private static final int MAX_SEQUENCE = 3;
-    
     private final String parentJobId;
     
     private final int sequence;
@@ -50,7 +47,7 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
     public ConsistencyCheckJobId(final String parentJobId, final int sequence) {
         super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
         this.parentJobId = parentJobId;
-        this.sequence = sequence > MAX_SEQUENCE ? MIN_SEQUENCE : sequence;
+        this.sequence = sequence > ConsistencyCheckSequence.MAX_SEQUENCE ? ConsistencyCheckSequence.MIN_SEQUENCE : sequence;
     }
     
     /**
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
similarity index 98%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
index 3d328d8fffb..33cdb7261e4 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api;
 
 import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPIFactory.java
similarity index 98%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPIFactory.java
index 858d79a60b8..283f5ea29b0 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactory.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPIFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api;
 
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
similarity index 96%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
index 77635c72f06..0cbfb235968 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl;
 
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
@@ -45,6 +45,11 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
similarity index 99%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index 501854a71ca..1a2fc807df9 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
similarity index 96%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index bb2359cd817..77dd7c346b7 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
similarity index 95%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index dfab5bf2383..b8e3f417e7e 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -34,6 +34,9 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 
 import java.sql.SQLException;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequence.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequence.java
similarity index 97%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequence.java
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequence.java
index 3a491d8ce40..8fb5059fe74 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequence.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequence.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util;
 
 import lombok.Getter;
 import lombok.ToString;
@@ -32,7 +32,7 @@ public final class ConsistencyCheckSequence {
     
     public static final int MIN_SEQUENCE = 1;
     
-    private static final int MAX_SEQUENCE = 3;
+    public static final int MAX_SEQUENCE = 3;
     
     /**
      * Get next sequence.
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
similarity index 95%
rename from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
index 4c63097dd84..6c5fa93a0f8 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
similarity index 95%
copy from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
copy to kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
index 4c63097dd84..6c5fa93a0f8 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
similarity index 88%
rename from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
index 141ea167a06..bef8b3cefc8 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
@@ -15,5 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
-org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI
similarity index 95%
rename from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
rename to kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI
index 4c63097dd84..6c5fa93a0f8 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl
diff --git a/kernel/data-pipeline/scenario/pom.xml b/kernel/data-pipeline/scenario/migration/pom.xml
similarity index 86%
copy from kernel/data-pipeline/scenario/pom.xml
copy to kernel/data-pipeline/scenario/migration/pom.xml
index 8f6ae22befd..f8db2df041a 100644
--- a/kernel/data-pipeline/scenario/pom.xml
+++ b/kernel/data-pipeline/scenario/migration/pom.xml
@@ -16,15 +16,16 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
+        <artifactId>shardingsphere-data-pipeline-scenario</artifactId>
         <groupId>org.apache.shardingsphere</groupId>
-        <artifactId>shardingsphere-data-pipeline</artifactId>
         <version>5.2.2-SNAPSHOT</version>
     </parent>
-    <artifactId>shardingsphere-data-pipeline-scenario</artifactId>
+    <artifactId>shardingsphere-data-pipeline-scenario-migration</artifactId>
     <name>${project.artifactId}</name>
     
     <dependencies>
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
similarity index 87%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index cdf071c6226..97e6eeef224 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -28,6 +28,12 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
similarity index 100%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
similarity index 90%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index efa627cb5e4..6839cd7f22e 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
 
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactory.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPIFactory.java
similarity index 99%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactory.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPIFactory.java
index 0c58d6ac6c2..013af8a1552 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactory.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPIFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.api;
 
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
similarity index 97%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
index 2a9c508c921..e4c6a8d4b0d 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
@@ -63,7 +63,12 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSche
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
@@ -251,7 +256,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         super.startDisabledJob(jobId);
         PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
             try {
-                ConsistencyCheckJobAPIFactory.getInstance().startDisabledJob(optional);
+                PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK).startDisabledJob(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
@@ -264,7 +269,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     public void stop(final String jobId) {
         PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(jobId).ifPresent(optional -> {
             try {
-                ConsistencyCheckJobAPIFactory.getInstance().stop(optional);
+                PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK).stop(optional);
                 // CHECKSTYLE:OFF
             } catch (final RuntimeException ex) {
                 // CHECKSTYLE:ON
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
similarity index 98%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 506774046f5..ea0219ad8ec 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -35,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
similarity index 99%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
index 1cc50d54b85..06b7ca072b7 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.config;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
similarity index 98%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 52b8c2c858f..5abd142b8c5 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.context;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.Pipelin
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 
 import java.util.Collection;
 import java.util.LinkedList;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
similarity index 99%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
index 7cdde29e4c1..eb3e9dd7cf2 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.context;
 
 import lombok.Getter;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
similarity index 95%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index f5918c62673..9f485f4365d 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
@@ -24,6 +24,8 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
diff --git a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
similarity index 96%
rename from kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 8bb8b44b40a..2014aa58185 100644
--- a/kernel/data-pipeline/scenario/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.prepare;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
@@ -37,6 +37,10 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerU
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.lock.LockContext;
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
similarity index 89%
rename from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
rename to kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
index 90857b277c4..147ed63180b 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
+++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
similarity index 89%
copy from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
copy to kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
index 90857b277c4..147ed63180b 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
+++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
similarity index 89%
rename from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
rename to kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
index 90857b277c4..147ed63180b 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
+++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
similarity index 86%
copy from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
copy to kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
index 90857b277c4..6afc193bf77 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
+++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor
diff --git a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI
similarity index 89%
rename from kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
rename to kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI
index 90857b277c4..147ed63180b 100644
--- a/kernel/data-pipeline/scenario/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
+++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl
+org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl
diff --git a/kernel/data-pipeline/scenario/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
similarity index 100%
rename from kernel/data-pipeline/scenario/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
rename to kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
diff --git a/kernel/data-pipeline/scenario/pom.xml b/kernel/data-pipeline/scenario/pom.xml
index 8f6ae22befd..4706839eb8d 100644
--- a/kernel/data-pipeline/scenario/pom.xml
+++ b/kernel/data-pipeline/scenario/pom.xml
@@ -25,18 +25,11 @@
         <version>5.2.2-SNAPSHOT</version>
     </parent>
     <artifactId>shardingsphere-data-pipeline-scenario</artifactId>
+    <packaging>pom</packaging>
     <name>${project.artifactId}</name>
     
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-data-pipeline-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-sharding-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
+    <modules>
+        <module>migration</module>
+        <module>consistencycheck</module>
+    </modules>
 </project>
diff --git a/proxy/backend/pom.xml b/proxy/backend/pom.xml
index a13b32afc14..d89528bfe1f 100644
--- a/proxy/backend/pom.xml
+++ b/proxy/backend/pom.xml
@@ -190,7 +190,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-data-pipeline-scenario</artifactId>
+            <artifactId>shardingsphere-data-pipeline-scenario-migration</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-data-pipeline-scenario-consistencycheck</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git a/test/pipeline/pom.xml b/test/pipeline/pom.xml
index 4ab0895c660..4c5c9e38288 100644
--- a/test/pipeline/pom.xml
+++ b/test/pipeline/pom.xml
@@ -45,7 +45,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-data-pipeline-scenario</artifactId>
+            <artifactId>shardingsphere-data-pipeline-scenario-migration</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-data-pipeline-scenario-consistencycheck</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
index 136871be3c2..613d82a122b 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index 21d62ecb150..f4859f279a0 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl;
 import org.junit.Test;
 
 import java.util.Collection;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f743bddf34f..af27cd35ba9 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -35,8 +35,8 @@ import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.BeforeClass;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
index 9d7edd588f4..bf1f73e5112 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -28,13 +28,13 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPI;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckSequence;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 75fe65e125b..d64ff38dd71 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -35,9 +35,9 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineD
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
similarity index 63%
copy from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
copy to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
index 136871be3c2..e9c2fa7af81 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessorFactoryTest.java
@@ -15,26 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIImpl;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor.ConsistencyCheckChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor.MigrationChangedJobConfigurationProcessor;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-public final class PipelineAPIFactoryTest {
+public final class PipelineChangedJobConfigurationProcessorFactoryTest {
     
     @Test
-    public void assertGetPipelineJobAPI() {
-        assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), instanceOf(MigrationJobAPIImpl.class));
-    }
-    
-    @Test
-    public void assertGetConsistencyCheckJobAPI() {
-        assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckJobAPIImpl.class));
+    public void assertGetInstance() {
+        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.MIGRATION), instanceOf(MigrationChangedJobConfigurationProcessor.class));
+        assertThat(PipelineChangedJobConfigurationProcessorFactory.getInstance(JobType.CONSISTENCY_CHECK), instanceOf(ConsistencyCheckChangedJobConfigurationProcessor.class));
     }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index e71e5e3906f..a09fbbfb081 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -31,8 +31,8 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index 426f51cd201..14b856233e7 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncr
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index d0d437882ee..e0b47cb8e5d 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryIncr
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 254f90c2e2f..0317b95a374 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -27,8 +27,8 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaData;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 33c3e0f9c9e..5da9832addf 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -30,10 +30,10 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index 539254e74e7..a1e8376d414 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIFactoryTest.java
similarity index 90%
rename from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIFactoryTest.java
index a8cf8eeda01..16ab4d83079 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPIFactoryTest.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl;
 
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPIFactory;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequenceTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
similarity index 99%
rename from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequenceTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
index 0be433f7240..5d94710b84c 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckSequenceTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/util/ConsistencyCheckSequenceTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util;
 
 import org.junit.Test;
 
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIFactoryTest.java
similarity index 91%
rename from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactoryTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIFactoryTest.java
index f52fd187ddb..00fcafc8308 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPIFactoryTest.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
 
+import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPIFactory;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
similarity index 95%
rename from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 8fa1b6f2f24..a3af156b8a6 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
@@ -26,6 +26,8 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDa
 import org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
diff --git a/test/pipeline/src/test/resources/logback-test.xml b/test/pipeline/src/test/resources/logback-test.xml
index ad329c3f139..7f384420624 100644
--- a/test/pipeline/src/test/resources/logback-test.xml
+++ b/test/pipeline/src/test/resources/logback-test.xml
@@ -24,7 +24,7 @@
     </appender>
     <logger name="org.apache.zookeeper" level="off" />
     <logger name="org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter" level="off" />
-    <logger name="org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl" level="off" />
+    <logger name="org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPIImpl" level="off" />
     <logger name="org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper" level="off" />
     
     <root>