You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/22 07:31:12 UTC

[shardingsphere] branch master updated: Refactor migration job and related impl with standalone source resource (#20316)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 203606c877f Refactor migration job and related impl with standalone source resource (#20316)
203606c877f is described below

commit 203606c877ff5f65620788dac48ee27df25fe6b3
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Aug 22 15:31:03 2022 +0800

    Refactor migration job and related impl with standalone source resource (#20316)
---
 ...hardingRuleAlteredJobConfigurationPreparer.java | 255 --------------------
 .../handler/update/MigrateTableUpdater.java        |  14 +-
 .../distsql/statement/MigrateTableStatement.java   |   2 +
 .../ShardingSpherePipelineDataSourceCreator.java   |   3 +-
 .../data/pipeline/api/MigrationJobPublicAPI.java   |   8 +
 .../pipeline/api/config/ImporterConfiguration.java |   3 +-
 .../api/config/ingest/DumperConfiguration.java     |   2 +-
 .../ingest/InventoryDumperConfiguration.java       |   1 -
 .../api/config/job/MigrationJobConfiguration.java  |  38 +--
 .../api/config/job/PipelineJobConfiguration.java   |   2 +-
 .../job/yaml/YamlMigrationJobConfiguration.java    |  26 +--
 .../yaml/YamlMigrationJobConfigurationSwapper.java |  14 +-
 .../job/yaml/YamlPipelineJobConfiguration.java     |   2 +-
 .../api/pojo/CreateMigrationJobParameter.java}     |  24 +-
 .../spi/ingest/position/PositionInitializer.java   |   6 +-
 .../RuleAlteredJobConfigurationPreparer.java       |  47 ----
 ...RuleAlteredJobConfigurationPreparerFactory.java |  44 ----
 .../api/config/MigrationJobConfigurationTest.java  |  15 +-
 ...RuleAlteredJobConfigurationPreparerFixture.java |  36 ---
 .../check/consistency/DataConsistencyChecker.java  |  12 +-
 ...DataMatchDataConsistencyCalculateAlgorithm.java |   4 +
 .../pipeline/core/execute/PipelineJobWorker.java   |   5 -
 .../core/prepare/PipelineJobPreparerUtils.java     |   9 +-
 .../datasource/AbstractDataSourcePreparer.java     |   1 +
 .../data/pipeline/core/util/SchemaTableUtil.java   |  93 ++++++++
 .../scenario/migration/MigrationJobAPIImpl.java    | 172 ++++++++++----
 .../scenario/migration/MigrationJobId.java         |   7 +-
 .../scenario/migration/MigrationJobPreparer.java   |  33 +--
 .../scenario/rulealtered/RuleAlteredJobWorker.java | 246 --------------------
 .../core/fixture/MigrationJobAPIFixture.java       |   5 +
 .../fixture/FixturePositionInitializer.java        |   2 +-
 .../pipeline/core/job/PipelineJobIdUtilsTest.java  |   5 +-
 .../mysql/ingest/MySQLPositionInitializer.java     |   2 +-
 .../mysql/ingest/MySQLPositionInitializerTest.java |   2 +-
 .../datasource/MySQLDataSourcePreparerTest.java    |   3 -
 .../ingest/OpenGaussPositionInitializer.java       |  27 +--
 .../opengauss/ingest/OpenGaussWalDumper.java       |   2 +-
 .../ingest/PostgreSQLPositionInitializer.java      |  17 +-
 .../postgresql/ingest/PostgreSQLWalDumper.java     |   3 +-
 .../ingest/PostgreSQLPositionInitializerTest.java  |  12 +-
 .../postgresql/ingest/PostgreSQLWalDumperTest.java |  22 +-
 .../rdl/rule/RuleDefinitionBackendHandler.java     |   8 +-
 .../pipeline/cases/base/BaseExtraSQLITCase.java    |  39 +---
 .../data/pipeline/cases/base/BaseITCase.java       | 257 ++++++++++-----------
 ...QLCommand.java => MigrationDistSQLCommand.java} |  38 ++-
 ...ScalingIT.java => MySQLMigrationGeneralIT.java} |  59 ++---
 ...ngIT.java => PostgreSQLMigrationGeneralIT.java} |  63 ++---
 ...alingIT.java => TextPrimaryKeyMigrationIT.java} |  49 ++--
 .../pipeline/cases/task/MySQLIncrementTask.java    |   2 +-
 .../cases/task/PostgreSQLIncrementTask.java        |  16 +-
 ...tainer.java => MigrationComposedContainer.java} |   4 +-
 .../pipeline/framework/watcher/ScalingWatcher.java |  14 +-
 .../src/test/resources/env/common/command.xml      | 103 +++------
 .../src/test/resources/logback-test.xml            |   1 +
 .../core/api/impl/MigrationJobAPIImplTest.java     |  13 +-
 .../consistency/DataConsistencyCheckerTest.java    |   4 +-
 .../core/fixture/FixturePositionInitializer.java   |   2 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |   2 +-
 .../core/util/JobConfigurationBuilder.java         |  19 +-
 .../migration_sharding_sphere_jdbc_target.yaml     |  57 +++++
 .../resources/migration_standard_jdbc_source.yaml  |  29 +++
 61 files changed, 766 insertions(+), 1239 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
deleted file mode 100644
index e611ae7c40f..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.sharding.data.pipeline;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.strategy.sharding.ComplexShardingStrategyConfiguration;
-import org.apache.shardingsphere.sharding.api.config.strategy.sharding.ShardingStrategyConfiguration;
-import org.apache.shardingsphere.sharding.api.config.strategy.sharding.StandardShardingStrategyConfiguration;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
-import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-/**
- * Sharding rule altered job configuration preparer.
- */
-@Slf4j
-public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer {
-    
-    @Override
-    public void extendJobConfiguration(final YamlMigrationJobConfiguration yamlJobConfig) {
-        Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new YamlMigrationJobConfigurationSwapper().swapToObject(yamlJobConfig));
-        yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
-        yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
-        yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
-        yamlJobConfig.setSchemaTablesMap(getSchemaTablesMap(yamlJobConfig.getDatabaseName(), actualDataNodes.keySet()));
-    }
-    
-    private static Map<String, List<DataNode>> getActualDataNodes(final MigrationJobConfiguration jobConfig) {
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
-        ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
-        ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
-        // TODO InstanceContext should not null
-        ShardingRule shardingRule = new ShardingRule(sourceRuleConfig, source.getRootConfig().getDataSources().keySet(), null);
-        Map<String, TableRule> tableRules = shardingRule.getTableRules();
-        Map<String, List<DataNode>> result = new LinkedHashMap<>();
-        Set<String> reShardNeededTables = new HashSet<>(jobConfig.getAlteredRuleYamlClassNameTablesMap().get(YamlShardingRuleConfiguration.class.getName()));
-        for (Entry<String, TableRule> entry : tableRules.entrySet()) {
-            if (reShardNeededTables.contains(entry.getKey())) {
-                result.put(entry.getKey(), entry.getValue().getActualDataNodes());
-            }
-        }
-        return result;
-    }
-    
-    private List<String> getJobShardingDataNodes(final Map<String, List<DataNode>> actualDataNodes) {
-        List<String> result = new LinkedList<>();
-        Map<String, Map<String, List<DataNode>>> groupedDataSourceDataNodesMap = groupDataSourceDataNodesMapByDataSourceName(actualDataNodes);
-        for (Map<String, List<DataNode>> each : groupedDataSourceDataNodesMap.values()) {
-            List<JobDataNodeEntry> dataNodeEntries = new ArrayList<>(each.size());
-            for (Entry<String, List<DataNode>> entry : each.entrySet()) {
-                dataNodeEntries.add(new JobDataNodeEntry(entry.getKey(), entry.getValue()));
-            }
-            result.add(new JobDataNodeLine(dataNodeEntries).marshal());
-        }
-        return result;
-    }
-    
-    private Map<String, Map<String, List<DataNode>>> groupDataSourceDataNodesMapByDataSourceName(final Map<String, List<DataNode>> actualDataNodes) {
-        Map<String, Map<String, List<DataNode>>> result = new LinkedHashMap<>();
-        for (Entry<String, List<DataNode>> entry : actualDataNodes.entrySet()) {
-            for (DataNode each : entry.getValue()) {
-                Map<String, List<DataNode>> groupedDataNodesMap = result.computeIfAbsent(each.getDataSourceName(), key -> new LinkedHashMap<>());
-                groupedDataNodesMap.computeIfAbsent(entry.getKey(), key -> new LinkedList<>()).add(each);
-            }
-        }
-        return result;
-    }
-    
-    private static String getLogicTables(final Set<String> logicTables) {
-        return String.join(",", logicTables);
-    }
-    
-    private static String getTablesFirstDataNodes(final Map<String, List<DataNode>> actualDataNodes) {
-        List<JobDataNodeEntry> dataNodeEntries = new ArrayList<>(actualDataNodes.size());
-        for (Entry<String, List<DataNode>> entry : actualDataNodes.entrySet()) {
-            dataNodeEntries.add(new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1)));
-        }
-        return new JobDataNodeLine(dataNodeEntries).marshal();
-    }
-    
-    private static Map<String, List<String>> getSchemaTablesMap(final String databaseName, final Set<String> logicTables) {
-        // TODO get by search_path
-        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
-        Map<String, List<String>> result = new LinkedHashMap<>();
-        database.getSchemas().forEach((schemaName, schema) -> {
-            for (String each : schema.getAllTableNames()) {
-                if (!logicTables.contains(each)) {
-                    continue;
-                }
-                result.computeIfAbsent(schemaName, unused -> new LinkedList<>()).add(each);
-            }
-        });
-        log.info("getSchemaTablesMap, result={}", result);
-        return result;
-    }
-    
-    @Override
-    public TaskConfiguration createTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
-        ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
-        ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
-        Map<String, DataSourceProperties> dataSourcePropsMap = new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
-        JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
-        String dataSourceName = dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
-        Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
-        for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
-            for (DataNode dataNode : each.getDataNodes()) {
-                tableNameMap.put(new ActualTableName(dataNode.getTableName()), new LogicTableName(each.getLogicTableName()));
-            }
-        }
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
-        DumperConfiguration dumperConfig = createDumperConfiguration(jobConfig.getDatabaseName(), dataSourceName,
-                dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableNameMap, tableNameSchemaNameMapping);
-        Optional<ShardingRuleConfiguration> targetRuleConfig = getTargetRuleConfiguration(jobConfig);
-        Set<LogicTableName> reShardNeededTables = jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
-        Map<LogicTableName, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig), reShardNeededTables);
-        ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, tableNameSchemaNameMapping);
-        TaskConfiguration result = new TaskConfiguration(dumperConfig, importerConfig);
-        log.info("createTaskConfiguration, dataSourceName={}, result={}", dataSourceName, result);
-        return result;
-    }
-    
-    private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final MigrationJobConfiguration jobConfig) {
-        return (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
-    }
-    
-    private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final MigrationJobConfiguration jobConfig) {
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
-        if (!(targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration)) {
-            return Optional.empty();
-        }
-        ShardingSpherePipelineDataSourceConfiguration target = (ShardingSpherePipelineDataSourceConfiguration) targetDataSourceConfig;
-        return Optional.of(ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(target.getRootConfig().getRules()));
-    }
-    
-    private static Map<LogicTableName, Set<String>> getShardingColumnsMap(final ShardingRuleConfiguration shardingRuleConfig, final Set<LogicTableName> reShardNeededTables) {
-        Set<String> defaultDatabaseShardingColumns = extractShardingColumns(shardingRuleConfig.getDefaultDatabaseShardingStrategy());
-        Set<String> defaultTableShardingColumns = extractShardingColumns(shardingRuleConfig.getDefaultTableShardingStrategy());
-        Map<LogicTableName, Set<String>> result = new ConcurrentHashMap<>();
-        for (ShardingTableRuleConfiguration each : shardingRuleConfig.getTables()) {
-            LogicTableName logicTableName = new LogicTableName(each.getLogicTable());
-            if (!reShardNeededTables.contains(logicTableName)) {
-                continue;
-            }
-            Set<String> shardingColumns = new HashSet<>();
-            shardingColumns.addAll(null == each.getDatabaseShardingStrategy() ? defaultDatabaseShardingColumns : extractShardingColumns(each.getDatabaseShardingStrategy()));
-            shardingColumns.addAll(null == each.getTableShardingStrategy() ? defaultTableShardingColumns : extractShardingColumns(each.getTableShardingStrategy()));
-            result.put(logicTableName, shardingColumns);
-        }
-        for (ShardingAutoTableRuleConfiguration each : shardingRuleConfig.getAutoTables()) {
-            LogicTableName logicTableName = new LogicTableName(each.getLogicTable());
-            if (!reShardNeededTables.contains(logicTableName)) {
-                continue;
-            }
-            ShardingStrategyConfiguration shardingStrategy = each.getShardingStrategy();
-            Set<String> shardingColumns = new HashSet<>(extractShardingColumns(shardingStrategy));
-            result.put(logicTableName, shardingColumns);
-        }
-        return result;
-    }
-    
-    private static Set<String> extractShardingColumns(final ShardingStrategyConfiguration shardingStrategy) {
-        if (shardingStrategy instanceof StandardShardingStrategyConfiguration) {
-            return new HashSet<>(Collections.singleton(((StandardShardingStrategyConfiguration) shardingStrategy).getShardingColumn()));
-        }
-        if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
-            return new HashSet<>(Arrays.asList(((ComplexShardingStrategyConfiguration) shardingStrategy).getShardingColumns().split(",")));
-        }
-        return Collections.emptySet();
-    }
-    
-    private static DumperConfiguration createDumperConfiguration(final String databaseName, final String dataSourceName, final Map<String, Object> props,
-                                                                 final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
-        DumperConfiguration result = new DumperConfiguration();
-        result.setDatabaseName(databaseName);
-        result.setDataSourceName(dataSourceName);
-        result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(YamlEngine.marshal(props)));
-        result.setTableNameMap(tableNameMap);
-        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
-        return result;
-    }
-    
-    private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
-                                                                     final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
-        PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        int retryTimes = jobConfig.getRetryTimes();
-        int concurrency = jobConfig.getConcurrency();
-        return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, retryTimes, concurrency);
-    }
-    
-    private static Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
-        Map<LogicTableName, Set<String>> result = new HashMap<>(shardingColumnsMap.size());
-        for (Entry<LogicTableName, Set<String>> entry : shardingColumnsMap.entrySet()) {
-            result.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
-        }
-        return Collections.unmodifiableMap(result);
-    }
-}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index b90a65aa39d..45771d65713 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -17,22 +17,32 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
+import org.apache.shardingsphere.sharding.yaml.swapper.YamlShardingRuleConfigurationSwapper;
 
 /**
  * Migrate table updater.
  */
+@Slf4j
 public final class MigrateTableUpdater implements RALUpdater<MigrateTableStatement> {
     
     private static final MigrationJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
+    private static final YamlShardingRuleConfigurationSwapper SHARDING_RULE_CONFIG_SWAPPER = new YamlShardingRuleConfigurationSwapper();
+    
     @Override
     public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
-        // TODO implement migrate table
-        JOB_API.getType();
+        log.info("start migrate job by {}", sqlStatement);
+        String targetDatabaseName = ObjectUtils.defaultIfNull(sqlStatement.getTargetDatabaseName(), databaseName);
+        CreateMigrationJobParameter createMigrationJobParameter = new CreateMigrationJobParameter(sqlStatement.getSourceDatabaseName(), sqlStatement.getSourceTableName(),
+                targetDatabaseName, sqlStatement.getTargetTableName());
+        JOB_API.createJobAndStart(createMigrationJobParameter);
     }
     
     @Override
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java
index 56d4c5bae8c..c03314798e1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.migration.distsql.statement;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
 
 /**
@@ -26,6 +27,7 @@ import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableS
  */
 @RequiredArgsConstructor
 @Getter
+@ToString
 public final class MigrateTableStatement extends UpdatableScalingRALStatement {
     
     private final String sourceDatabaseName;
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index 908b5460d2f..c3ecd5cecc2 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -44,11 +44,10 @@ public final class ShardingSpherePipelineDataSourceCreator implements PipelineDa
         YamlRootConfiguration rootConfig = (YamlRootConfiguration) pipelineDataSourceConfig;
         YamlShardingRuleConfiguration shardingRuleConfig = ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
         enableRangeQueryForInline(shardingRuleConfig);
-        String databaseName = null;
         Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources(), false);
         Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rootConfig.getRules());
         try {
-            return ShardingSphereDataSourceFactory.createDataSource(databaseName, dataSourceMap, ruleConfigs, null);
+            return ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), dataSourceMap, ruleConfigs, null);
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index a31bd8aa037..a1f4045dced 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
@@ -126,4 +127,11 @@ public interface MigrationJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI
      * @param resourceNames resource names
      */
     void dropMigrationSourceResources(Collection<String> resourceNames);
+    
+    /**
+     * Create job migration config and start.
+     *
+     * @param parameter create migration job parameter
+     */
+    void createJobAndStart(CreateMigrationJobParameter parameter);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
index a5dbba84466..d451719ecf2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -71,7 +72,7 @@ public final class ImporterConfiguration {
      * @return sharding columns
      */
     public Set<String> getShardingColumns(final String logicTableName) {
-        return shardingColumnsMap.get(new LogicTableName(logicTableName));
+        return ObjectUtils.defaultIfNull(shardingColumnsMap.get(new LogicTableName(logicTableName)), Collections.emptySet());
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 20e32724c98..e47aa2be450 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -38,7 +38,7 @@ import java.util.Map;
 // TODO fields final
 public class DumperConfiguration {
     
-    private String databaseName;
+    private String jobId;
     
     private String dataSourceName;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index c1f33e2cc54..9340b0e7208 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -46,7 +46,6 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
     private JobRateLimitAlgorithm rateLimitAlgorithm;
     
     public InventoryDumperConfiguration(final DumperConfiguration dumperConfig) {
-        setDatabaseName(dumperConfig.getDatabaseName());
         setDataSourceName(dumperConfig.getDataSourceName());
         setDataSourceConfig(dumperConfig.getDataSourceConfig());
         setTableNameMap(dumperConfig.getTableNameMap());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
index d3d77cb0601..e55cb062886 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config.job;
 
-import com.google.common.base.Splitter;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 
@@ -32,15 +32,14 @@ import java.util.Map;
 @RequiredArgsConstructor
 @Getter
 @Slf4j
+@ToString(exclude = {"source", "target", "schemaTablesMap"})
 public final class MigrationJobConfiguration implements PipelineJobConfiguration {
     
     private final String jobId;
     
-    private final String databaseName;
+    private final String targetDatabaseName;
     
-    private final Integer activeVersion;
-    
-    private final Integer newVersion;
+    private final String sourceDataSourceName;
     
     private final String sourceDatabaseType;
     
@@ -50,17 +49,14 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration
     
     private final PipelineDataSourceConfiguration target;
     
-    /**
-     * Map{altered rule yaml class name, re-shard needed table names}.
-     */
-    private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-    
     /**
      * Map{schema name, logic table names}.
      */
     private final Map<String, List<String>> schemaTablesMap;
     
-    private final String logicTables;
+    private final String sourceTableName;
+    
+    private final String targetTableName;
     
     /**
      * Collection of each logic table's first data node.
@@ -83,24 +79,6 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration
      * @return job sharding count
      */
     public int getJobShardingCount() {
-        return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
-    }
-    
-    /**
-     * Split {@linkplain #logicTables} to logic table names.
-     *
-     * @return logic table names
-     */
-    public List<String> splitLogicTableNames() {
-        return Splitter.on(',').splitToList(logicTables);
-    }
-    
-    @Override
-    public String toString() {
-        return "MigrationJobConfiguration{"
-                + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
-                + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
-                + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
-                + '}';
+        return 1;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
index f1f97320b0c..336c996e0eb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
@@ -34,7 +34,7 @@ public interface PipelineJobConfiguration {
      *
      * @return database name
      */
-    String getDatabaseName();
+    String getTargetDatabaseName();
     
     /**
      * Get job sharding count.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
index 3971d8b7087..9d17d6817c6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
 import com.google.common.base.Preconditions;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 
@@ -32,15 +33,14 @@ import java.util.Map;
 @Getter
 @Setter
 @Slf4j
+@ToString(exclude = {"source", "target", "schemaTablesMap"})
 public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfiguration {
     
     private String jobId;
     
-    private String databaseName;
+    private String targetDatabaseName;
     
-    private Integer activeVersion;
-    
-    private Integer newVersion;
+    private String sourceDataSourceName;
     
     private String sourceDatabaseType;
     
@@ -50,17 +50,14 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
     
     private YamlPipelineDataSourceConfiguration target;
     
-    /**
-     * Map{altered rule yaml class name, re-shard needed table names}.
-     */
-    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-    
     /**
      * Map{schema name, logic table names}.
      */
     private Map<String, List<String>> schemaTablesMap;
     
-    private String logicTables;
+    private String sourceTableName;
+    
+    private String targetTableName;
     
     /**
      * Collection of each logic table's first data node.
@@ -102,13 +99,4 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
         Preconditions.checkNotNull(yamlConfig.getType());
         Preconditions.checkNotNull(yamlConfig.getParameter());
     }
-    
-    @Override
-    public String toString() {
-        return "YamlMigrationJobConfiguration{"
-                + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
-                + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
-                + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
-                + '}';
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
index 9c28e438ccd..31c7e95f356 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
@@ -36,16 +36,15 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
     public YamlMigrationJobConfiguration swapToYamlConfiguration(final MigrationJobConfiguration data) {
         YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
         result.setJobId(data.getJobId());
-        result.setDatabaseName(data.getDatabaseName());
-        result.setActiveVersion(data.getActiveVersion());
-        result.setNewVersion(data.getNewVersion());
+        result.setTargetDatabaseName(data.getTargetDatabaseName());
         result.setSourceDatabaseType(data.getSourceDatabaseType());
+        result.setSourceTableName(data.getSourceTableName());
+        result.setSourceDataSourceName(data.getSourceDataSourceName());
         result.setTargetDatabaseType(data.getTargetDatabaseType());
         result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource()));
         result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
-        result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap());
         result.setSchemaTablesMap(data.getSchemaTablesMap());
-        result.setLogicTables(data.getLogicTables());
+        result.setTargetTableName(data.getTargetTableName());
         result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
         result.setJobShardingDataNodes(data.getJobShardingDataNodes());
         result.setConcurrency(data.getConcurrency());
@@ -55,11 +54,10 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
     
     @Override
     public MigrationJobConfiguration swapToObject(final YamlMigrationJobConfiguration yamlConfig) {
-        return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
-                yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
+        return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getTargetDatabaseName(), yamlConfig.getSourceDataSourceName(),
                 yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
                 dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
-                yamlConfig.getAlteredRuleYamlClassNameTablesMap(), yamlConfig.getSchemaTablesMap(), yamlConfig.getLogicTables(),
+                yamlConfig.getSchemaTablesMap(), yamlConfig.getSourceTableName(), yamlConfig.getTargetTableName(),
                 yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(),
                 yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
index c0ab0ca640b..89b0c9716dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
@@ -36,5 +36,5 @@ public interface YamlPipelineJobConfiguration extends YamlConfiguration {
      *
      * @return database name
      */
-    String getDatabaseName();
+    String getTargetDatabaseName();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactoryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateMigrationJobParameter.java
similarity index 59%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactoryTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateMigrationJobParameter.java
index 5fdccecd524..b7183d87668 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactoryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateMigrationJobParameter.java
@@ -15,18 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
 
-import org.apache.shardingsphere.data.pipeline.spi.fixture.RuleAlteredJobConfigurationPreparerFixture;
-import org.junit.Test;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-
-public final class RuleAlteredJobConfigurationPreparerFactoryTest {
+@Data
+@RequiredArgsConstructor
+public final class CreateMigrationJobParameter {
+    
+    private final String sourceResourceName;
+    
+    private final String sourceTableName;
+    
+    private final String targetDatabaseName;
     
-    @Test
-    public void assertGetInstance() {
-        assertThat(RuleAlteredJobConfigurationPreparerFactory.getInstance(), instanceOf(RuleAlteredJobConfigurationPreparerFixture.class));
-    }
+    private final String targetTableName;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
index 05b9a390718..9ec5cf0d1c9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
@@ -34,10 +34,11 @@ public interface PositionInitializer extends TypedSPI {
      * Init position by data source.
      *
      * @param dataSource data source
+     * @param slotNameSuffix slot name suffix
      * @return position
      * @throws SQLException SQL exception
      */
-    IngestPosition<?> init(DataSource dataSource) throws SQLException;
+    IngestPosition<?> init(DataSource dataSource, String slotNameSuffix) throws SQLException;
     
     /**
      * Init position by string data.
@@ -51,8 +52,9 @@ public interface PositionInitializer extends TypedSPI {
      * Clean up by data source if necessary.
      *
      * @param dataSource data source
+     * @param slotNameSuffix slot name suffix
      * @throws SQLException SQL exception
      */
-    default void destroy(DataSource dataSource) throws SQLException {
+    default void destroy(DataSource dataSource, final String slotNameSuffix) throws SQLException {
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
deleted file mode 100644
index 2137bfd6f82..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
-
-/**
- * Rule altered job configuration preparer.
- */
-public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI {
-    
-    /**
-     * Extend job configuration.
-     *
-     * @param yamlJobConfig YAML job configuration
-     */
-    void extendJobConfiguration(YamlMigrationJobConfiguration yamlJobConfig);
-    
-    /**
-     * Create task configuration, used by underlying scheduler.
-     *
-     * @param jobConfig job configuration
-     * @param jobShardingItem job sharding item
-     * @param pipelineProcessConfig pipeline process configuration
-     * @return task configuration
-     */
-    TaskConfiguration createTaskConfiguration(MigrationJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactory.java
deleted file mode 100644
index 6d26d29fd48..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
-
-/**
- * Rule altered job configuration preparer factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class RuleAlteredJobConfigurationPreparerFactory {
-    
-    static {
-        ShardingSphereServiceLoader.register(RuleAlteredJobConfigurationPreparer.class);
-    }
-    
-    /**
-     * Get instance of rule altered job configuration preparer.
-     * 
-     * @return got instance
-     */
-    // TODO RuleAlteredJobConfigurationPreparer should be TypedSPI or OrderedSPI
-    public static RuleAlteredJobConfigurationPreparer getInstance() {
-        return RequiredSPIRegistry.getRegisteredService(RuleAlteredJobConfigurationPreparer.class);
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
index 504ce956e8a..aaac7418886 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
@@ -22,8 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import org.junit.Test;
 
-import java.util.Arrays;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -35,22 +33,13 @@ public final class MigrationJobConfigurationTest {
     public void assertGetJobShardingCountByNull() {
         YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
         MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
-        assertThat(jobConfig.getJobShardingCount(), is(0));
+        assertThat(jobConfig.getJobShardingCount(), is(1));
     }
     
     @Test
     public void assertGetJobShardingCount() {
         YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
-        yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
-        MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
-        assertThat(jobConfig.getJobShardingCount(), is(2));
-    }
-    
-    @Test
-    public void assertSplitLogicTableNames() {
-        YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
-        yamlJobConfig.setLogicTables("foo_tbl,bar_tbl");
         MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
-        assertThat(jobConfig.splitLogicTableNames(), is(Arrays.asList("foo_tbl", "bar_tbl")));
+        assertThat(jobConfig.getJobShardingCount(), is(1));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
deleted file mode 100644
index ab73dbdbefe..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.fixture;
-
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-
-public final class RuleAlteredJobConfigurationPreparerFixture implements RuleAlteredJobConfigurationPreparer {
-    
-    @Override
-    public void extendJobConfiguration(final YamlMigrationJobConfiguration yamlJobConfig) {
-    }
-    
-    @Override
-    public TaskConfiguration createTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
-        return null;
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 98987fcc9f5..50edd3ca848 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
@@ -32,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -51,6 +53,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -64,6 +67,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Data consistency checker.
  */
+@Slf4j
 public final class DataConsistencyChecker {
     
     // TODO remove jobConfig for common usage
@@ -77,8 +81,10 @@ public final class DataConsistencyChecker {
     
     public DataConsistencyChecker(final MigrationJobConfiguration jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
         this.jobConfig = jobConfig;
-        logicTableNames = jobConfig.splitLogicTableNames();
-        tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
+        logicTableNames = Collections.singletonList(jobConfig.getTargetTableName());
+        // TODO need get from actual data source.
+        Map<String, List<String>> schemaTablesMap = SchemaTableUtil.getSchemaTablesMap(jobConfig.getTargetDatabaseName(), Collections.singleton(jobConfig.getTargetTableName()));
+        tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(schemaTablesMap));
         this.readRateLimitAlgorithm = readRateLimitAlgorithm;
     }
     
@@ -166,7 +172,7 @@ public final class DataConsistencyChecker {
             String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
             for (String each : logicTableNames) {
-                ShardingSphereTable table = getTableMetaData(jobConfig.getDatabaseName(), each);
+                ShardingSphereTable table = getTableMetaData(jobConfig.getTargetDatabaseName(), each);
                 if (null == table) {
                     throw new PipelineDataConsistencyCheckFailedException("Can not get metadata for table " + each);
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 8ca2576d731..f1437b7d125 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -188,6 +188,10 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                     if (thisResult instanceof SQLXML && thatResult instanceof SQLXML) {
                         return ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
                     }
+                    // TODO The standard MySQL JDBC will convert unsigned mediumint to Integer, but proxy convert it to Long
+                    if (thisResult instanceof Integer && thatResult instanceof Long) {
+                        return ((Integer) thisResult).longValue() == (Long) thatResult;
+                    }
                     if (!new EqualsBuilder().append(thisResult, thatResult).isEquals()) {
                         log.warn("record column value not match, value1={}, value2={}, record1={}, record2={}", thisResult, thatResult, thisNext, thatNext);
                         return false;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
index 860fad8976f..8ddbbadd35a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
@@ -18,9 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -44,8 +41,6 @@ public final class PipelineJobWorker {
                 return;
             }
             log.info("start worker initialization");
-            EventBusContext eventBusContext = PipelineContext.getContextManager().getInstanceContext().getEventBusContext();
-            eventBusContext.register(RuleAlteredJobWorker.getInstance());
             new FinishedCheckJobExecutor().start();
             new PipelineJobExecutor().start();
             WORKER_INITIALIZED.set(true);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 3b364c51da6..327e16cd324 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -101,7 +101,7 @@ public final class PipelineJobPreparerUtils {
         }
         String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
         DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        return PositionInitializerFactory.getInstance(databaseType).init(dataSource);
+        return PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
     }
     
     /**
@@ -141,10 +141,11 @@ public final class PipelineJobPreparerUtils {
     /**
      * Cleanup job preparer.
      *
+     * @param jobId job id
      * @param pipelineDataSourceConfig pipeline data source config
      * @throws SQLException sql exception
      */
-    public static void destroyPosition(final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
+    public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
         DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
         PositionInitializer positionInitializer = PositionInitializerFactory.getInstance(databaseType.getType());
         log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
@@ -152,7 +153,7 @@ public final class PipelineJobPreparerUtils {
             ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
             for (DataSourceProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(dataSourceConfig.getRootConfig()).values()) {
                 try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
-                    positionInitializer.destroy(dataSource);
+                    positionInitializer.destroy(dataSource, jobId);
                 }
             }
         }
@@ -161,7 +162,7 @@ public final class PipelineJobPreparerUtils {
             try (
                     PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(
                             DataSourcePoolCreator.create((DataSourceProperties) dataSourceConfig.getDataSourceConfiguration()), databaseType)) {
-                positionInitializer.destroy(dataSource);
+                positionInitializer.destroy(dataSource, jobId);
             }
         }
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 9d19924716b..e503a4147cd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -103,6 +103,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         try (Statement statement = targetConnection.createStatement()) {
             statement.execute(sql);
         } catch (final SQLException ex) {
+            log.warn("execute target table sql failed", ex);
             for (String ignoreMessage : IGNORE_EXCEPTION_MESSAGE) {
                 if (ex.getMessage().contains(ignoreMessage)) {
                     return;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
new file mode 100644
index 00000000000..fe42737b2b7
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Schema table util.
+ */
+@Slf4j
+public final class SchemaTableUtil {
+    
+    /**
+     * Get schema table map.
+     *
+     * @param databaseName database name
+     * @param logicTables logic tables
+     * @return schema table map
+     */
+    public static Map<String, List<String>> getSchemaTablesMap(final String databaseName, final Set<String> logicTables) {
+        // TODO get by search_path
+        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
+        Map<String, List<String>> result = new LinkedHashMap<>();
+        database.getSchemas().forEach((schemaName, schema) -> {
+            for (String each : schema.getAllTableNames()) {
+                if (!logicTables.contains(each)) {
+                    continue;
+                }
+                result.computeIfAbsent(schemaName, unused -> new LinkedList<>()).add(each);
+            }
+        });
+        log.info("getSchemaTablesMap, result={}", result);
+        return result;
+    }
+    
+    /**
+     * Get schema tables map from actual data source.
+     *
+     * @param pipelineDataSourceConfig pipeline data source config
+     * @param tableName table name
+     * @return schema tables map
+     */
+    public static Map<String, List<String>> getSchemaTablesMapFromActual(final PipelineDataSourceConfiguration pipelineDataSourceConfig, final String tableName) {
+        Map<String, List<String>> result = new HashMap<>();
+        try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig)) {
+            try (Connection connection = dataSource.getConnection()) {
+                DatabaseMetaData metaData = connection.getMetaData();
+                ResultSet resultSet = metaData.getTables(null, null, tableName, new String[]{"TABLE"});
+                while (resultSet.next()) {
+                    String schemaName = resultSet.getString("TABLE_SCHEM");
+                    result.computeIfAbsent(schemaName, k -> new ArrayList<>()).add(resultSet.getString("TABLE_NAME"));
+                }
+            }
+        } catch (final SQLException ex) {
+            log.error("Get schema name map error", ex);
+            throw new AddMigrationSourceResourceException(ex.getMessage());
+        }
+        return result;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index fb01beb0a6f..a7908548509 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -19,25 +19,36 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
@@ -49,33 +60,40 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
 import org.apache.shardingsphere.data.pipeline.core.exception.DropMigrationSourceResourceException;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
 
+import javax.sql.DataSource;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
 /**
  * Migration job API impl.
@@ -83,6 +101,12 @@ import java.util.stream.Stream;
 @Slf4j
 public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implements MigrationJobAPI {
     
+    private static final YamlRuleConfigurationSwapperEngine RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
+    
+    private static final YamlDataSourceConfigurationSwapper DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
+    
+    private static final YamlPipelineDataSourceConfigurationSwapper PIPELINE_DATA_SOURCE_CONFIG_SWAPPER = new YamlPipelineDataSourceConfigurationSwapper();
+    
     private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
     
     private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
@@ -95,16 +119,13 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
         MigrationJobId jobId = (MigrationJobId) pipelineJobId;
-        String text = jobId.getFormatVersion() + "|" + jobId.getCurrentMetadataVersion() + "T" + jobId.getNewMetadataVersion() + "|" + jobId.getDatabaseName();
-        return Hex.encodeHexString(text.getBytes(StandardCharsets.UTF_8), true);
+        String text = jobId.getDatabaseName() + "|" + jobId.getTableName() + "|" + jobId.getSourceDataSourceName();
+        return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
     }
     
     @Override
     public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
         YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig;
-        if (null == config.getJobShardingDataNodes()) {
-            RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(config);
-        }
         if (null == yamlJobConfig.getJobId()) {
             config.setJobId(generateJobId(config));
         }
@@ -116,15 +137,19 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
             PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getTarget().getType(), config.getTarget().getParameter());
             config.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
         }
+        JobDataNodeEntry nodeEntry = new JobDataNodeEntry(config.getSourceTableName(),
+                Collections.singletonList(new DataNode(config.getSourceDataSourceName(), config.getSourceTableName())));
+        String dataNodeLine = new JobDataNodeLine(Collections.singletonList(nodeEntry)).marshal();
+        config.setTablesFirstDataNodes(dataNodeLine);
+        config.setJobShardingDataNodes(Collections.singletonList(dataNodeLine));
     }
     
     private String generateJobId(final YamlMigrationJobConfiguration config) {
         MigrationJobId jobId = new MigrationJobId();
         jobId.setTypeCode(getJobType().getTypeCode());
-        jobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
-        jobId.setCurrentMetadataVersion(config.getActiveVersion());
-        jobId.setNewMetadataVersion(config.getNewVersion());
-        jobId.setDatabaseName(config.getDatabaseName());
+        jobId.setSourceDataSourceName(config.getSourceDataSourceName());
+        jobId.setTableName(config.getSourceTableName());
+        jobId.setDatabaseName(config.getTargetDatabaseName());
         return marshalJobId(jobId);
     }
     
@@ -144,7 +169,44 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     
     @Override
     public TaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
-        return RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig, jobShardingItem, pipelineProcessConfig);
+        Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
+        tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()), new LogicTableName(jobConfig.getSourceTableName()));
+        Map<LogicTableName, String> tableNameSchemaMap = TableNameSchemaNameMapping.convert(SchemaTableUtil.getSchemaTablesMapFromActual(jobConfig.getSource(), jobConfig.getSourceTableName()));
+        TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(tableNameSchemaMap);
+        DumperConfiguration dumperConfig = createDumperConfiguration(jobConfig.getJobId(), jobConfig.getSourceDataSourceName(), jobConfig.getSource(), tableNameMap, tableNameSchemaNameMapping);
+        // TODO now shardingColumnsMap always empty, 
+        ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, pipelineProcessConfig, Collections.emptyMap(), tableNameSchemaNameMapping);
+        TaskConfiguration result = new TaskConfiguration(dumperConfig, importerConfig);
+        log.info("createTaskConfiguration, dataSourceName={}, result={}", jobConfig.getSourceDataSourceName(), result);
+        return result;
+    }
+    
+    private static DumperConfiguration createDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource,
+                                                                 final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        DumperConfiguration result = new DumperConfiguration();
+        result.setJobId(jobId);
+        result.setDataSourceName(dataSourceName);
+        result.setDataSourceConfig(sourceDataSource);
+        result.setTableNameMap(tableNameMap);
+        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+        return result;
+    }
+    
+    private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
+                                                                     final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+        PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
+        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+        int retryTimes = jobConfig.getRetryTimes();
+        int concurrency = jobConfig.getConcurrency();
+        return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, retryTimes, concurrency);
+    }
+    
+    private static Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+        Map<LogicTableName, Set<String>> result = new HashMap<>(shardingColumnsMap.size());
+        for (Entry<LogicTableName, Set<String>> entry : shardingColumnsMap.entrySet()) {
+            result.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
+        }
+        return Collections.unmodifiableMap(result);
     }
     
     @Override
@@ -154,17 +216,13 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
         return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig);
     }
     
-    private Stream<JobBriefInfo> getJobBriefInfos() {
-        return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"));
-    }
-    
     protected PipelineJobInfo getJobInfo(final String jobName) {
         MigrationJobInfo result = new MigrationJobInfo(jobName);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(result.getJobId());
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
         result.setActive(!jobConfigPOJO.isDisabled());
         result.setShardingTotalCount(jobConfig.getJobShardingCount());
-        result.setTables(jobConfig.getLogicTables());
+        result.setTables(jobConfig.getSourceTableName());
         result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
         result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
         result.setJobParameter(jobConfigPOJO.getJobParameter());
@@ -232,7 +290,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     
     @Override
     public void stopClusterWriteDB(final MigrationJobConfiguration jobConfig) {
-        String databaseName = jobConfig.getDatabaseName();
+        String databaseName = jobConfig.getTargetDatabaseName();
         LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
         LockDefinition lockDefinition = new ExclusiveLockDefinition(databaseName);
         if (lockContext.isLocked(lockDefinition)) {
@@ -258,7 +316,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     
     @Override
     public void restoreClusterWriteDB(final MigrationJobConfiguration jobConfig) {
-        String databaseName = jobConfig.getDatabaseName();
+        String databaseName = jobConfig.getTargetDatabaseName();
         LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
         LockDefinition lockDefinition = new ExclusiveLockDefinition(databaseName);
         if (lockContext.isLocked(lockDefinition)) {
@@ -370,23 +428,6 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
     
     @Override
     public void switchClusterConfiguration(final MigrationJobConfiguration jobConfig) {
-        String jobId = jobConfig.getJobId();
-        MigrationProcessContext processContext = buildPipelineProcessContext(jobConfig);
-        GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        if (isDataConsistencyCheckNeeded(processContext)) {
-            Optional<Boolean> checkResult = repositoryAPI.getJobCheckResult(jobId);
-            if (!checkResult.isPresent() || !checkResult.get()) {
-                throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
-            }
-        }
-        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(jobConfig.getDatabaseName(), jobConfig.getActiveVersion(), jobConfig.getNewVersion());
-        PipelineContext.getContextManager().getInstanceContext().getEventBusContext().post(taskFinishedEvent);
-        for (int each : repositoryAPI.getShardingItems(jobId)) {
-            PipelineJobCenter.getJobItemContext(jobId, each).ifPresent(jobItemContext -> jobItemContext.setStatus(JobStatus.FINISHED));
-            updateJobItemStatus(jobId, each, JobStatus.FINISHED);
-        }
-        PipelineJobCenter.stop(jobId);
-        stop(jobId);
     }
     
     @Override
@@ -428,6 +469,53 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
         dataSourcePersistService.persist(getJobType(), metaDataDataSource);
     }
     
+    @Override
+    public void createJobAndStart(final CreateMigrationJobParameter parameter) {
+        YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
+        Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(JobType.MIGRATION);
+        Map<String, Object> sourceDataSourceProps = DATA_SOURCE_CONFIG_SWAPPER.swapToMap(metaDataDataSource.get(parameter.getSourceResourceName()));
+        YamlPipelineDataSourceConfiguration sourcePipelineDataSourceConfiguration = createYamlPipelineDataSourceConfiguration(StandardPipelineDataSourceConfiguration.TYPE,
+                YamlEngine.marshal(sourceDataSourceProps));
+        result.setSource(sourcePipelineDataSourceConfiguration);
+        result.setSourceDatabaseType(new StandardPipelineDataSourceConfiguration(sourceDataSourceProps).getDatabaseType().getType());
+        result.setSourceDataSourceName(parameter.getSourceResourceName());
+        result.setSourceTableName(parameter.getSourceTableName());
+        Map<String, Map<String, Object>> targetDataSourceProperties = new HashMap<>();
+        ShardingSphereDatabase targetDatabase = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(parameter.getTargetDatabaseName());
+        for (Entry<String, DataSource> entry : targetDatabase.getResource().getDataSources().entrySet()) {
+            Map<String, Object> dataSourceProps = DATA_SOURCE_CONFIG_SWAPPER.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
+            targetDataSourceProperties.put(entry.getKey(), dataSourceProps);
+        }
+        String targetDatabaseName = parameter.getTargetDatabaseName();
+        YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(targetDatabaseName, targetDataSourceProperties, targetDatabase.getRuleMetaData().getConfigurations());
+        PipelineDataSourceConfiguration targetPipelineDataSource = new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+        result.setTarget(createYamlPipelineDataSourceConfiguration(targetPipelineDataSource.getType(), YamlEngine.marshal(targetPipelineDataSource.getDataSourceConfiguration())));
+        result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
+        result.setTargetDatabaseName(targetDatabaseName);
+        result.setTargetTableName(parameter.getTargetTableName());
+        result.setSchemaTablesMap(SchemaTableUtil.getSchemaTablesMapFromActual(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(sourcePipelineDataSourceConfiguration),
+                parameter.getSourceTableName()));
+        extendYamlJobConfiguration(result);
+        MigrationJobConfiguration jobConfiguration = new YamlMigrationJobConfigurationSwapper().swapToObject(result);
+        start(jobConfiguration);
+    }
+    
+    private YamlRootConfiguration getYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
+        YamlRootConfiguration result = new YamlRootConfiguration();
+        result.setDatabaseName(databaseName);
+        result.setDataSources(yamlDataSources);
+        Collection<YamlRuleConfiguration> yamlRuleConfigurations = RULE_CONFIG_SWAPPER_ENGINE.swapToYamlRuleConfigurations(rules);
+        result.setRules(yamlRuleConfigurations);
+        return result;
+    }
+    
+    private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final String type, final String parameter) {
+        YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
+        result.setType(type);
+        result.setParameter(parameter);
+        return result;
+    }
+    
     @Override
     public String getType() {
         return getJobType().getTypeName();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index f204d9d774a..94d2bc1825b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.Getter;
-import lombok.NonNull;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
@@ -33,9 +32,7 @@ public final class MigrationJobId extends AbstractPipelineJobId {
     
     public static final String CURRENT_VERSION = "01";
     
-    @NonNull
-    private Integer currentMetadataVersion;
+    private String tableName;
     
-    @NonNull
-    private Integer newMetadataVersion;
+    private String sourceDataSourceName;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index eae08b224aa..0c47a28a51a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -42,8 +41,6 @@ import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.datanode.DataNodes;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -52,11 +49,11 @@ import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
+import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * Migration job preparer.
@@ -140,22 +137,22 @@ public final class MigrationJobPreparer {
         TableNameSchemaNameMapping tableNameSchemaNameMapping = jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
         String targetDatabaseType = jobConfig.getTargetDatabaseType();
         if (isSourceAndTargetSchemaAvailable(jobConfig)) {
-            PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
-                    DatabaseTypeFactory.getInstance(targetDatabaseType), jobConfig.getDatabaseName(),
+            PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(Collections.singletonList(jobConfig.getTargetTableName()),
+                    DatabaseTypeFactory.getInstance(targetDatabaseType), jobConfig.getTargetDatabaseName(),
                     jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), jobItemContext.getDataSourceManager(), tableNameSchemaNameMapping);
             PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParameter);
         }
         ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
-        ShardingSphereDatabase sphereDatabase = metaData.getDatabases().get(jobConfig.getDatabaseName());
+        ShardingSphereDatabase sphereDatabase = metaData.getDatabases().get(jobConfig.getTargetDatabaseName());
         ShardingSphereSQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
         JobDataNodeLine jobDataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
         Map<String, String> tableNameMap = new HashMap<>();
-        for (JobDataNodeEntry each : jobDataNodeLine.getEntries()) {
-            String actualTableName = getActualTable(sphereDatabase, each.getLogicTableName());
-            tableNameMap.put(each.getLogicTableName(), actualTableName);
-        }
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getDatabaseName(),
-                jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), sphereDatabase.getResource().getDataSources(), jobItemContext.getDataSourceManager(),
+        tableNameMap.put(jobConfig.getTargetTableName(), jobConfig.getSourceTableName());
+        PipelineDataSourceWrapper dataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
+        Map<String, DataSource> sourceDataSourceMap = new HashMap<>(1, 1.0F);
+        sourceDataSourceMap.put(jobConfig.getSourceDataSourceName(), dataSource);
+        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getTargetDatabaseName(),
+                jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), sourceDataSourceMap, jobItemContext.getDataSourceManager(),
                 jobDataNodeLine, tableNameMap, tableNameSchemaNameMapping, sqlParserEngine);
         PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, prepareTargetTablesParameter);
     }
@@ -170,14 +167,6 @@ public final class MigrationJobPreparer {
         return true;
     }
     
-    private String getActualTable(final ShardingSphereDatabase database, final String tableName) {
-        DataNodes dataNodes = new DataNodes(database.getRuleMetaData().getRules());
-        Optional<DataNode> filteredDataNode = dataNodes.getDataNodes(tableName).stream()
-                .filter(each -> database.getResource().getDataSources().containsKey(each.getDataSourceName().contains(".") ? each.getDataSourceName().split("\\.")[0] : each.getDataSourceName()))
-                .findFirst();
-        return filteredDataNode.map(DataNode::getTableName).orElse(tableName);
-    }
-    
     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
         InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(),
                 jobItemContext.getJobProcessContext().getImporterExecuteEngine(), jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), jobItemContext.getInitProgress());
@@ -205,7 +194,7 @@ public final class MigrationJobPreparer {
      */
     public void cleanup(final MigrationJobConfiguration jobConfig) {
         try {
-            PipelineJobPreparerUtils.destroyPosition(jobConfig.getSource());
+            PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), jobConfig.getSource());
         } catch (final SQLException ex) {
             log.warn("Scaling job destroying failed", ex);
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
deleted file mode 100644
index 7d81e4c0960..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import com.google.common.base.Preconditions;
-import com.google.common.eventbus.Subscribe;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Rule altered job worker.
- */
-@SuppressWarnings("UnstableApiUsage")
-@Slf4j
-public final class RuleAlteredJobWorker {
-    
-    private static final RuleAlteredJobWorker INSTANCE = new RuleAlteredJobWorker();
-    
-    private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
-    
-    /**
-     * Get instance.
-     *
-     * @return instance
-     */
-    public static RuleAlteredJobWorker getInstance() {
-        return INSTANCE;
-    }
-    
-    /**
-     * Is on rule altered action enabled.
-     *
-     * @param ruleConfig rule configuration
-     * @return enabled or not
-     */
-    public static boolean isOnRuleAlteredActionEnabled(final RuleConfiguration ruleConfig) {
-        if (null == ruleConfig) {
-            return false;
-        }
-        Optional<RuleAlteredDetector> detector = RuleAlteredDetectorFactory.findInstance(ruleConfig);
-        return detector.isPresent() && detector.get().getOnRuleAlteredActionConfig(ruleConfig).isPresent();
-    }
-    
-    /**
-     * Create rule altered context.
-     *
-     * @param jobConfig job configuration
-     * @return rule altered context
-     */
-    public static MigrationProcessContext createRuleAlteredContext(final MigrationJobConfiguration jobConfig) {
-        YamlRootConfiguration targetRootConfig = getYamlRootConfig(jobConfig);
-        YamlRuleConfiguration yamlRuleConfig = null;
-        for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
-            if (jobConfig.getAlteredRuleYamlClassNameTablesMap().containsKey(each.getClass().getName())) {
-                yamlRuleConfig = each;
-                break;
-            }
-        }
-        if (null == yamlRuleConfig) {
-            throw new PipelineJobCreationException("could not find altered rule");
-        }
-        RuleConfiguration ruleConfig = SWAPPER_ENGINE.swapToRuleConfiguration(yamlRuleConfig);
-        Optional<RuleAlteredDetector> detector = RuleAlteredDetectorFactory.findInstance(ruleConfig);
-        Preconditions.checkState(detector.isPresent());
-        Optional<OnRuleAlteredActionConfiguration> onRuleAlteredActionConfig = detector.get().getOnRuleAlteredActionConfig(ruleConfig);
-        if (!onRuleAlteredActionConfig.isPresent()) {
-            log.error("rule altered action enabled but actor is not configured, ignored, ruleConfig={}", ruleConfig);
-            throw new PipelineJobCreationException("rule altered actor not configured");
-        }
-        return new MigrationProcessContext(jobConfig.getJobId(), onRuleAlteredActionConfig.get());
-    }
-    
-    /**
-     * Get YAML root configuration, which should include rule altered action configuration.
-     *
-     * @param jobConfig job configuration
-     * @return YAML root configuration
-     */
-    private static YamlRootConfiguration getYamlRootConfig(final MigrationJobConfiguration jobConfig) {
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
-        if (targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
-            return ((ShardingSpherePipelineDataSourceConfiguration) targetDataSourceConfig).getRootConfig();
-        }
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
-        return ((ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig).getRootConfig();
-    }
-    
-    /**
-     * Start scaling job.
-     *
-     * @param event start scaling event.
-     */
-    @Subscribe
-    public void start(final StartScalingEvent event) {
-        log.info("Start scaling job by {}", event);
-        if (hasUncompletedJobOfSameDatabaseName(event.getDatabaseName())) {
-            log.warn("There is uncompleted job with the same database name, please handle it first, current job will be ignored");
-            return;
-        }
-        Optional<MigrationJobConfiguration> jobConfig = createJobConfig(event);
-        if (jobConfig.isPresent()) {
-            MigrationJobAPIFactory.getInstance().start(jobConfig.get());
-        } else {
-            log.info("Switch rule configuration immediately.");
-            ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getDatabaseName(), event.getActiveVersion(), event.getNewVersion());
-            PipelineContext.getContextManager().getInstanceContext().getEventBusContext().post(taskFinishedEvent);
-        }
-    }
-    
-    private Optional<MigrationJobConfiguration> createJobConfig(final StartScalingEvent event) {
-        YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getSourceDataSource(), event.getSourceRule());
-        YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getTargetDataSource(), event.getTargetRule());
-        Map<String, List<String>> alteredRuleYamlClassNameTablesMap = new HashMap<>();
-        for (Pair<YamlRuleConfiguration, YamlRuleConfiguration> each : groupSourceTargetRuleConfigsByType(sourceRootConfig.getRules(), targetRootConfig.getRules())) {
-            YamlRuleConfiguration yamlRuleConfig = null == each.getLeft() ? each.getRight() : each.getLeft();
-            Optional<RuleAlteredDetector> detector = RuleAlteredDetectorFactory.findInstance(yamlRuleConfig);
-            if (!detector.isPresent()) {
-                continue;
-            }
-            List<String> ruleAlteredLogicTables = detector.get().findRuleAlteredLogicTables(each.getLeft(), each.getRight(), sourceRootConfig.getDataSources(), targetRootConfig.getDataSources());
-            log.info("type={}, ruleAlteredLogicTables={}", yamlRuleConfig.getClass().getName(), ruleAlteredLogicTables);
-            if (!ruleAlteredLogicTables.isEmpty()) {
-                alteredRuleYamlClassNameTablesMap.put(yamlRuleConfig.getClass().getName(), ruleAlteredLogicTables);
-            }
-        }
-        if (alteredRuleYamlClassNameTablesMap.isEmpty()) {
-            log.error("no altered rule");
-            throw new PipelineJobCreationException("no altered rule");
-        }
-        if (alteredRuleYamlClassNameTablesMap.size() > 1) {
-            log.error("more than 1 rule altered");
-            throw new PipelineJobCreationException("more than 1 rule altered");
-        }
-        YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
-        result.setDatabaseName(event.getDatabaseName());
-        result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
-        result.setActiveVersion(event.getActiveVersion());
-        result.setNewVersion(event.getNewVersion());
-        result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
-        result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
-        PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
-        return Optional.of(new YamlMigrationJobConfigurationSwapper().swapToObject(result));
-    }
-    
-    private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules,
-                                                                                                              final Collection<YamlRuleConfiguration> targetRules) {
-        Map<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> sourceRulesMap = sourceRules.stream().collect(Collectors.toMap(YamlRuleConfiguration::getClass, Function.identity()));
-        Map<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> targetRulesMap = targetRules.stream().collect(Collectors.toMap(YamlRuleConfiguration::getClass, Function.identity()));
-        Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> result = new LinkedList<>();
-        for (Entry<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> entry : sourceRulesMap.entrySet()) {
-            YamlRuleConfiguration targetRule = targetRulesMap.get(entry.getKey());
-            result.add(Pair.of(entry.getValue(), targetRule));
-        }
-        for (Entry<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> entry : targetRulesMap.entrySet()) {
-            if (!sourceRulesMap.containsKey(entry.getKey())) {
-                result.add(Pair.of(null, entry.getValue()));
-            }
-        }
-        return result;
-    }
-    
-    private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final YamlRootConfiguration yamlConfig) {
-        PipelineDataSourceConfiguration config = new ShardingSpherePipelineDataSourceConfiguration(yamlConfig);
-        YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
-        result.setType(config.getType());
-        result.setParameter(config.getParameter());
-        return result;
-    }
-    
-    @SuppressWarnings("unchecked")
-    private YamlRootConfiguration getYamlRootConfiguration(final String databaseName, final String dataSources, final String rules) {
-        YamlRootConfiguration result = new YamlRootConfiguration();
-        result.setDatabaseName(databaseName);
-        Map<String, Map<String, Object>> yamlDataSources = YamlEngine.unmarshal(dataSources, Map.class);
-        result.setDataSources(yamlDataSources);
-        Collection<YamlRuleConfiguration> yamlRuleConfigs = YamlEngine.unmarshal(rules, Collection.class, true);
-        result.setRules(yamlRuleConfigs);
-        return result;
-    }
-    
-    private boolean hasUncompletedJobOfSameDatabaseName(final String databaseName) {
-        boolean result = false;
-        for (PipelineJobInfo each : MigrationJobAPIFactory.getInstance().list()) {
-            if (MigrationJobAPIFactory.getInstance().getJobProgress(each.getJobId()).values().stream()
-                    .allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
-                continue;
-            }
-            MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(each.getJobParameter());
-            if (databaseName.equals(jobConfig.getDatabaseName())) {
-                result = true;
-                break;
-            }
-        }
-        return result;
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 424ee393d5b..a95657857cf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContex
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
@@ -166,6 +167,10 @@ public final class MigrationJobAPIFixture implements MigrationJobAPI {
     public void dropMigrationSourceResources(final Collection<String> resourceNames) {
     }
     
+    @Override
+    public void createJobAndStart(final CreateMigrationJobParameter parameter) {
+    }
+    
     @Override
     public MigrationJobConfiguration getJobConfiguration(final String jobId) {
         return null;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
index 0d57ff5638c..0f423a77a86 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
 public final class FixturePositionInitializer implements PositionInitializer {
     
     @Override
-    public IngestPosition<?> init(final DataSource dataSource) throws SQLException {
+    public IngestPosition<?> init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         return null;
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 2b9ceeb066e..7263a01bab3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -30,10 +30,9 @@ public final class PipelineJobIdUtilsTest {
     public void assertCodec() {
         MigrationJobId pipelineJobId = new MigrationJobId();
         pipelineJobId.setTypeCode(JobType.MIGRATION.getTypeCode());
-        pipelineJobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
         pipelineJobId.setDatabaseName("sharding_db");
-        pipelineJobId.setCurrentMetadataVersion(0);
-        pipelineJobId.setNewMetadataVersion(1);
+        pipelineJobId.setSourceDataSourceName("new_0");
+        pipelineJobId.setTableName("t_order");
         String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
         JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
         assertThat(actualJobType, is(JobType.MIGRATION));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java
index 637afcd625e..8411be2acdb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java
@@ -33,7 +33,7 @@ import java.sql.SQLException;
 public final class MySQLPositionInitializer implements PositionInitializer {
     
     @Override
-    public BinlogPosition init(final DataSource dataSource) throws SQLException {
+    public BinlogPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             BinlogPosition result = getBinlogPosition(connection);
             result.setServerId(getServerId(connection));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java
index a5501bbfd46..40f8df40373 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java
@@ -62,7 +62,7 @@ public final class MySQLPositionInitializerTest {
     @Test
     public void assertGetCurrentPosition() throws SQLException {
         MySQLPositionInitializer mySQLPositionInitializer = new MySQLPositionInitializer();
-        BinlogPosition actual = mySQLPositionInitializer.init(dataSource);
+        BinlogPosition actual = mySQLPositionInitializer.init(dataSource, "");
         assertThat(actual.getServerId(), is(SERVER_ID));
         assertThat(actual.getFilename(), is(LOG_FILE_NAME));
         assertThat(actual.getPosition(), is(LOG_POSITION));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 98cc575d786..5686c12076b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -35,7 +34,6 @@ import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.sql.SQLException;
-import java.util.Collections;
 
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.same;
@@ -86,7 +84,6 @@ public final class MySQLDataSourcePreparerTest {
         when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getTarget().getParameter()).thenReturn("target");
         when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
-        when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new JobDataNodeLine(Collections.emptyList()));
     }
     
     @Test
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index c05d33c4e3c..49ec98c05d4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -45,9 +45,9 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
     private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
     @Override
-    public WalPosition init(final DataSource dataSource) throws SQLException {
+    public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            createSlotIfNotExist(connection);
+            createSlotIfNotExist(connection, slotNameSuffix);
             return getWalPosition(connection);
         }
     }
@@ -63,10 +63,10 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
      * @param connection connection
      * @throws SQLException SQL exception
      */
-    private void createSlotIfNotExist(final Connection connection) throws SQLException {
-        String slotName = getUniqueSlotName(connection);
+    private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
+        String slotName = getUniqueSlotName(connection, slotNameSuffix);
         if (!isSlotExist(connection, slotName)) {
-            createSlotBySQL(connection);
+            createSlotBySQL(connection, slotName);
         }
     }
     
@@ -81,8 +81,8 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
         }
     }
     
-    private void createSlotBySQL(final Connection connection) throws SQLException {
-        String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN);
+    private void createSlotBySQL(final Connection connection, final String slotName) throws SQLException {
+        String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
         try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
             preparedStatement.execute();
         } catch (final SQLException ex) {
@@ -102,14 +102,14 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
     }
     
     @Override
-    public void destroy(final DataSource dataSource) throws SQLException {
+    public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            dropSlotIfExist(connection);
+            dropSlotIfExist(connection, slotNameSuffix);
         }
     }
     
-    private void dropSlotIfExist(final Connection connection) throws SQLException {
-        String slotName = getUniqueSlotName(connection);
+    private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
+        String slotName = getUniqueSlotName(connection, slotNameSuffix);
         if (!isSlotExist(connection, slotName)) {
             log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
             return;
@@ -124,12 +124,13 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
      * Get the unique slot name by connection.
      *
      * @param connection connection
+     * @param slotNameSuffix slot name suffix
      * @return the unique name by connection
      * @throws SQLException failed when getCatalog
      */
-    public static String getUniqueSlotName(final Connection connection) throws SQLException {
+    public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
         // same as PostgreSQL, but length over 64 will throw an exception directly
-        String slotName = DigestUtils.md5Hex(connection.getCatalog());
+        String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
         return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 2769ad9303a..8362bed849d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -77,7 +77,7 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
     private void dump() {
         PGReplicationStream stream = null;
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
-            stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection));
+            stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
             DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()));
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 178f4e5c3c4..02855ad13bf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -43,9 +43,9 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
     private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
     @Override
-    public WalPosition init(final DataSource dataSource) throws SQLException {
+    public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            createSlotIfNotExist(connection, getUniqueSlotName(connection));
+            createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
             return getWalPosition(connection);
         }
     }
@@ -101,14 +101,14 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
     }
     
     @Override
-    public void destroy(final DataSource dataSource) throws SQLException {
+    public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            dropSlotIfExist(connection);
+            dropSlotIfExist(connection, slotNameSuffix);
         }
     }
     
-    private void dropSlotIfExist(final Connection connection) throws SQLException {
-        String slotName = getUniqueSlotName(connection);
+    private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
+        String slotName = getUniqueSlotName(connection, slotNameSuffix);
         if (!isSlotExisting(connection, slotName)) {
             log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
             return;
@@ -125,12 +125,13 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
      * Get the unique slot name by connection.
      *
      * @param connection the connection
+     * @param slotNameSuffix slot name suffix
      * @return the unique name by connection
      * @throws SQLException failed when getCatalog
      */
-    public static String getUniqueSlotName(final Connection connection) throws SQLException {
+    public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
         // PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit
-        String slotName = DigestUtils.md5Hex(connection.getCatalog());
+        String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
         return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index c844cb063d6..67f181511ab 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -79,7 +79,8 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
         // TODO use unified PgConnection
         try (
                 Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
-                PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection), walPosition.getLogSequenceNumber())) {
+                PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()),
+                        walPosition.getLogSequenceNumber())) {
             PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
             DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
             while (isRunning()) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
index 44f411357ee..99d6325270c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
@@ -61,8 +61,8 @@ public final class PostgreSQLPositionInitializerTest {
         when(connection.getCatalog()).thenReturn("sharding_db");
         when(connection.getMetaData()).thenReturn(databaseMetaData);
         PreparedStatement lsn96PreparedStatement = mockPostgreSQL96LSN();
-        when(connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", PostgreSQLPositionInitializer.getUniqueSlotName(connection), "test_decoding")))
-                .thenReturn(mock(PreparedStatement.class));
+        when(connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", PostgreSQLPositionInitializer.getUniqueSlotName(connection, ""),
+                "test_decoding"))).thenReturn(mock(PreparedStatement.class));
         when(connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()")).thenReturn(lsn96PreparedStatement);
         PreparedStatement lsn10PreparedStatement = mockPostgreSQL10LSN();
         when(connection.prepareStatement("SELECT PG_CURRENT_WAL_LSN()")).thenReturn(lsn10PreparedStatement);
@@ -73,7 +73,7 @@ public final class PostgreSQLPositionInitializerTest {
         mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
-        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
+        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
         assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
     }
     
@@ -81,7 +81,7 @@ public final class PostgreSQLPositionInitializerTest {
     public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
         mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
-        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
+        WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
         assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
     }
     
@@ -90,7 +90,7 @@ public final class PostgreSQLPositionInitializerTest {
         mockSlotExistsOrNot(false);
         when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
         when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
-        new PostgreSQLPositionInitializer().init(dataSource);
+        new PostgreSQLPositionInitializer().init(dataSource, "");
     }
     
     @SneakyThrows(SQLException.class)
@@ -127,7 +127,7 @@ public final class PostgreSQLPositionInitializerTest {
         mockSlotExistsOrNot(true);
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         when(connection.prepareStatement("SELECT pg_drop_replication_slot(?)")).thenReturn(preparedStatement);
-        new PostgreSQLPositionInitializer().destroy(dataSource);
+        new PostgreSQLPositionInitializer().destroy(dataSource, "");
         verify(preparedStatement).execute();
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index fef17423483..7425970029a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.postgresql.jdbc.PgConnection;
 import org.postgresql.replication.LogSequenceNumber;
@@ -51,6 +52,9 @@ import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -103,6 +107,7 @@ public final class PostgreSQLWalDumperTest {
         }
         PipelineDataSourceConfiguration dataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password);
         DumperConfiguration result = new DumperConfiguration();
+        result.setJobId("0101123455F45SCALING8898");
         result.setDataSourceConfig(dataSourceConfig);
         result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")));
         result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
@@ -116,13 +121,16 @@ public final class PostgreSQLWalDumperTest {
             ReflectionUtil.setFieldValue(walDumper, "logicalReplication", logicalReplication);
             when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
             when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
-            when(pgConnection.getCatalog()).thenReturn("test_db");
-            when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection), position.getLogSequenceNumber()))
-                    .thenReturn(pgReplicationStream);
-            ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes());
-            when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
-            when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
-            walDumper.start();
+            try (MockedStatic<PostgreSQLPositionInitializer> positionInitializer = mockStatic(PostgreSQLPositionInitializer.class)) {
+                positionInitializer.when(() -> PostgreSQLPositionInitializer.getUniqueSlotName(eq(pgConnection), anyString())).thenReturn("0101123455F45SCALING8898");
+                when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection, ""), position.getLogSequenceNumber()))
+                        .thenReturn(pgReplicationStream);
+                ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes());
+                when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
+                when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
+                // TODO NPE occurred here
+                walDumper.start();
+            }
         } catch (final IngestException ignored) {
         }
         assertThat(channel.fetchRecords(100, 0).size(), is(1));
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index 7e17bb3476a..da85cb3f2fb 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.distsql.parser.statement.rdl.RuleDefinitionStatement;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
@@ -76,12 +75,7 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
         RuleConfiguration currentRuleConfig = findCurrentRuleConfiguration(database, ruleConfigClass).orElse(null);
         ruleDefinitionUpdater.checkSQLStatement(database, sqlStatement, currentRuleConfig);
         Optional<RuleDefinitionAlterPreprocessor> preprocessor = RuleDefinitionAlterPreprocessorFactory.findInstance(sqlStatement);
-        if (!RuleAlteredJobWorker.isOnRuleAlteredActionEnabled(currentRuleConfig)) {
-            if (RULE_ALTERED_ACTIONS.contains(sqlStatement.getClass().getCanonicalName())) {
-                // TODO throw new RuntimeException("scaling is not enabled");
-                log.warn("rule altered and scaling is not enabled.");
-            }
-        } else if (preprocessor.isPresent()) {
+        if (preprocessor.isPresent()) {
             prepareScaling(database, sqlStatement, (RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, preprocessor.get());
             return new UpdateResponseHeader(sqlStatement);
         }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
index a84fa4e4360..9ac98ddad79 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
@@ -26,8 +26,6 @@ import org.apache.shardingsphere.test.integration.env.container.atomic.util.Data
 import javax.xml.bind.JAXB;
 import java.util.Objects;
 
-import static org.junit.Assert.assertTrue;
-
 @Slf4j
 public abstract class BaseExtraSQLITCase extends BaseITCase {
     
@@ -39,42 +37,23 @@ public abstract class BaseExtraSQLITCase extends BaseITCase {
         extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseExtraSQLITCase.class.getClassLoader().getResource(parameterized.getScenario())), ExtraSQLCommand.class);
     }
     
-    protected void createNoUseTable() {
-        executeWithLog("CREATE SHARDING TABLE RULE no_use (RESOURCES(ds_0, ds_1), SHARDING_COLUMN=sharding_id, TYPE(NAME='MOD',PROPERTIES('sharding-count'='4')))");
-        executeWithLog("CREATE TABLE no_use(id int(11) NOT NULL,sharding_id int(11) NOT NULL, PRIMARY KEY (id))");
-    }
-    
-    protected void createOrderTable() {
-        executeWithLog(extraSQLCommand.getCreateTableOrder());
+    protected void createSourceOrderTable() {
+        sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder());
     }
     
-    protected void createTableIndexList(final String schema) {
+    protected void createSourceTableIndexList(final String schema) {
         if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
-            executeWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.t_order ( user_id )", schema));
+            sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.t_order ( user_id )", schema));
         } else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
-            executeWithLog(String.format("CREATE INDEX idx_user_id ON %s.t_order ( user_id )", schema));
+            sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON %s.t_order ( user_id )", schema));
         }
     }
     
-    protected void createOrderItemTable() {
-        executeWithLog(extraSQLCommand.getCreateTableOrderItem());
+    protected void createSourceCommentOnList(final String schema) {
+        sourceExecuteWithLog(String.format("COMMENT ON COLUMN %s.t_order.user_id IS 'user id'", schema));
     }
     
-    private boolean executeSql(final String sql) {
-        try {
-            getJdbcTemplate().execute(sql);
-            return true;
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            // TODO openGauss seem return the different error message, need to check it
-            if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
-                log.info("openGauss error msg:{}", ex.getMessage());
-                return false;
-            } else {
-                assertTrue(ex.getMessage(), ex.getCause().getMessage().endsWith("The database sharding_db is read-only"));
-            }
-            return false;
-        }
+    protected void createSourceOrderItemTable() {
+        sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 8ccd188320e..b0dcc2f8433 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -29,12 +29,11 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.integration.data.pipeline.cases.command.CommonSQLCommand;
+import org.apache.shardingsphere.integration.data.pipeline.cases.command.MigrationDistSQLCommand;
 import org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
 import org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
 import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
+import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
 import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import org.apache.shardingsphere.integration.data.pipeline.framework.watcher.ScalingWatcher;
@@ -44,17 +43,18 @@ import org.apache.shardingsphere.test.integration.env.container.atomic.util.Data
 import org.apache.shardingsphere.test.integration.env.runtime.DataSourceEnvironment;
 import org.junit.Rule;
 import org.opengauss.util.PSQLException;
-import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.BadSqlGrammarException;
-import org.springframework.jdbc.core.JdbcTemplate;
 
 import javax.sql.DataSource;
 import javax.xml.bind.JAXB;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -80,8 +80,6 @@ public abstract class BaseITCase {
     
     protected static final String DS_0 = "scaling_it_0";
     
-    protected static final String DS_1 = "scaling_it_1";
-    
     protected static final String DS_2 = "scaling_it_2";
     
     protected static final String DS_3 = "scaling_it_3";
@@ -98,7 +96,7 @@ public abstract class BaseITCase {
     
     private final BaseComposedContainer composedContainer;
     
-    private final CommonSQLCommand commonSQLCommand;
+    private final MigrationDistSQLCommand migrationDistSQLCommand;
     
     private final DatabaseType databaseType;
     
@@ -106,7 +104,9 @@ public abstract class BaseITCase {
     
     private final String password;
     
-    private JdbcTemplate jdbcTemplate;
+    private DataSource sourceDataSource;
+    
+    private DataSource proxyDataSource;
     
     @Setter
     private Thread increaseTaskThread;
@@ -114,13 +114,13 @@ public abstract class BaseITCase {
     public BaseITCase(final ScalingParameterized parameterized) {
         databaseType = parameterized.getDatabaseType();
         if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
-            composedContainer = new DockerComposedContainer(parameterized.getDatabaseType(), parameterized.getDockerImageName());
+            composedContainer = new MigrationComposedContainer(parameterized.getDatabaseType(), parameterized.getDockerImageName());
         } else {
             composedContainer = new NativeComposedContainer(parameterized.getDatabaseType());
         }
         composedContainer.start();
         if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
-            DockerStorageContainer storageContainer = ((DockerComposedContainer) composedContainer).getStorageContainer();
+            DockerStorageContainer storageContainer = ((MigrationComposedContainer) composedContainer).getStorageContainer();
             username = storageContainer.getUsername();
             password = storageContainer.getUnifiedPassword();
         } else {
@@ -131,12 +131,12 @@ public abstract class BaseITCase {
         if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
             cleanUpDataSource();
         }
-        commonSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")), CommonSQLCommand.class);
-        scalingWatcher = new ScalingWatcher(composedContainer, jdbcTemplate);
+        migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")), MigrationDistSQLCommand.class);
+        scalingWatcher = new ScalingWatcher(composedContainer);
     }
     
     private void cleanUpDataSource() {
-        for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
+        for (String each : Arrays.asList(DS_0, DS_2, DS_3, DS_4)) {
             composedContainer.cleanUpDatabase(each);
         }
     }
@@ -150,131 +150,104 @@ public abstract class BaseITCase {
         try (Connection connection = DriverManager.getConnection(jdbcUrl, ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
             if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
                 try {
-                    executeWithLog(connection, "DROP DATABASE sharding_db");
+                    connectionExecuteWithLog(connection, "DROP DATABASE sharding_db");
                 } catch (final SQLException ex) {
                     log.warn("Drop sharding_db failed, maybe it's not exist. error msg={}", ex.getMessage());
                 }
             }
-            executeWithLog(connection, "CREATE DATABASE sharding_db");
+            connectionExecuteWithLog(connection, "CREATE DATABASE sharding_db");
         } catch (final SQLException ex) {
             throw new IllegalStateException(ex);
         }
-        jdbcTemplate = new JdbcTemplate(getProxyDataSource("sharding_db"));
+        sourceDataSource = getDataSource(getActualJdbcUrlTemplate(DS_0, false), username, password);
+        proxyDataSource = getDataSource(composedContainer.getProxyJdbcUrl("sharding_db"), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
     }
     
-    protected DataSource getProxyDataSource(final String databaseName) {
+    private DataSource getDataSource(final String jdbcUrl, final String username, final String password) {
         HikariDataSource result = new HikariDataSource();
         result.setDriverClassName(DataSourceEnvironment.getDriverClassName(getDatabaseType()));
-        result.setJdbcUrl(composedContainer.getProxyJdbcUrl(databaseName));
-        result.setUsername(ProxyContainerConstants.USERNAME);
-        result.setPassword(ProxyContainerConstants.PASSWORD);
+        result.setJdbcUrl(jdbcUrl);
+        result.setUsername(username);
+        result.setPassword(password);
         result.setMaximumPoolSize(2);
         result.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
         return result;
     }
     
-    protected boolean waitShardingAlgorithmEffect(final int maxWaitTimes) {
-        long startTime = System.currentTimeMillis();
-        int waitTimes = 0;
-        do {
-            List<Map<String, Object>> result = queryForListWithLog("SHOW SHARDING ALGORITHMS");
-            if (result.size() >= 3) {
-                log.info("waitShardingAlgorithmEffect time consume: {}", System.currentTimeMillis() - startTime);
-                return true;
-            }
-            ThreadUtil.sleep(2, TimeUnit.SECONDS);
-            waitTimes++;
-        } while (waitTimes <= maxWaitTimes);
-        return false;
-    }
-    
-    @SneakyThrows
+    @SneakyThrows(SQLException.class)
     protected void addSourceResource() {
-        // TODO if mysql can append database firstly, they can be combined
-        if (databaseType instanceof MySQLDatabaseType) {
-            try (Connection connection = DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl(""), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
-                connection.createStatement().execute("USE sharding_db");
-                addSourceResource0(connection);
-            }
-        } else {
-            try (Connection connection = DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl("sharding_db"), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
-                addSourceResource0(connection);
-            }
+        try (Connection connection = DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl("sharding_db"), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
+            addSourceResource0(connection);
         }
-        List<Map<String, Object>> resources = queryForListWithLog("SHOW DATABASE RESOURCES FROM sharding_db");
-        assertThat(resources.size(), is(2));
     }
     
-    private void addSourceResource0(final Connection connection) throws SQLException {
-        String addSourceResource = commonSQLCommand.getSourceAddResourceTemplate().replace("${user}", username)
+    @SneakyThrows(SQLException.class)
+    private void addSourceResource0(final Connection connection) {
+        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+            try {
+                connectionExecuteWithLog(connection, "DROP MIGRATION SOURCE RESOURCE ds_0");
+            } catch (final SQLException ex) {
+                log.warn("Drop sharding_db failed, maybe it's not exist. error msg={}", ex.getMessage());
+            }
+        }
+        String addSourceResource = migrationDistSQLCommand.getAddMigrationSourceResourceTemplate().replace("${user}", username)
                 .replace("${password}", password)
-                .replace("${ds0}", getActualJdbcUrlTemplate(DS_0))
-                .replace("${ds1}", getActualJdbcUrlTemplate(DS_1));
-        executeWithLog(connection, addSourceResource);
+                .replace("${ds0}", getActualJdbcUrlTemplate(DS_0, true));
+        connectionExecuteWithLog(connection, addSourceResource);
     }
     
     @SneakyThrows
     protected void addTargetResource() {
-        String addTargetResource = commonSQLCommand.getTargetAddResourceTemplate().replace("${user}", username)
+        String addTargetResource = migrationDistSQLCommand.getAddMigrationTargetResourceTemplate().replace("${user}", username)
                 .replace("${password}", password)
-                .replace("${ds2}", getActualJdbcUrlTemplate(DS_2))
-                .replace("${ds3}", getActualJdbcUrlTemplate(DS_3))
-                .replace("${ds4}", getActualJdbcUrlTemplate(DS_4));
-        executeWithLog(addTargetResource);
+                .replace("${ds2}", getActualJdbcUrlTemplate(DS_2, true))
+                .replace("${ds3}", getActualJdbcUrlTemplate(DS_3, true))
+                .replace("${ds4}", getActualJdbcUrlTemplate(DS_4, true));
+        proxyExecuteWithLog(addTargetResource, 2);
         List<Map<String, Object>> resources = queryForListWithLog("SHOW DATABASE RESOURCES from sharding_db");
-        assertThat(resources.size(), is(5));
-        assertBeforeApplyScalingMetadataCorrectly();
+        assertThat(resources.size(), is(3));
     }
     
-    private String getActualJdbcUrlTemplate(final String databaseName) {
+    private String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
         if (ScalingITEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
-            DockerStorageContainer storageContainer = ((DockerComposedContainer) composedContainer).getStorageContainer();
-            return DataSourceEnvironment.getURL(getDatabaseType(), getDatabaseType().getType().toLowerCase() + ".host", storageContainer.getPort(), databaseName);
+            DockerStorageContainer storageContainer = ((MigrationComposedContainer) composedContainer).getStorageContainer();
+            if (isInContainer) {
+                return DataSourceEnvironment.getURL(getDatabaseType(), getDatabaseType().getType().toLowerCase() + ".host", storageContainer.getPort(), databaseName);
+            } else {
+                return DataSourceEnvironment.getURL(getDatabaseType(), storageContainer.getHost(), storageContainer.getFirstMappedPort(), databaseName);
+            }
         }
         return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1", ENV.getActualDataSourceDefaultPort(databaseType), databaseName);
     }
     
-    protected void initShardingAlgorithm() {
-        executeWithLog(getCommonSQLCommand().getCreateDatabaseShardingAlgorithm());
-        executeWithLog(getCommonSQLCommand().getCreateOrderShardingAlgorithm());
-        executeWithLog(getCommonSQLCommand().getCreateOrderItemShardingAlgorithm());
+    protected void createTargetOrderTableRule() {
+        proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 3);
     }
     
-    protected void createOrderTableRule() {
-        executeWithLog(commonSQLCommand.getCreateOrderTableRule());
+    protected void createTargetOrderItemTableRule() {
+        proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(), 3);
     }
     
-    protected void createOrderItemTableRule() {
-        executeWithLog(commonSQLCommand.getCreateOrderItemTableRule());
+    protected void startMigrationOrder() {
+        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 5);
     }
     
-    protected void bindingShardingRule() {
-        executeWithLog("CREATE SHARDING BINDING TABLE RULES (t_order,t_order_item)");
+    protected void startMigrationOrderItem() {
+        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(), 5);
     }
     
+    // TODO use new DistSQL
     protected void createScalingRule() {
-        if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
-            try {
-                List<Map<String, Object>> scalingList = jdbcTemplate.queryForList("SHOW MIGRATION LIST");
-                for (Map<String, Object> each : scalingList) {
-                    String id = each.get("id").toString();
-                    executeWithLog(String.format("CLEAN MIGRATION '%s'", id), 0);
-                }
-            } catch (final DataAccessException ex) {
-                log.error("Failed to show migration list. {}", ex.getMessage());
-            }
-        }
-        executeWithLog("CREATE SHARDING SCALING RULE scaling_manual (INPUT(SHARDING_SIZE=1000), DATA_CONSISTENCY_CHECKER(TYPE(NAME='DATA_MATCH')))");
     }
     
-    protected void createSchema(final String schemaName) {
+    protected void createSourceSchema(final String schemaName) {
         if (DatabaseTypeUtil.isPostgreSQL(databaseType)) {
-            executeWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
+            sourceExecuteWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
             return;
         }
         if (DatabaseTypeUtil.isOpenGauss(databaseType)) {
             try {
-                executeWithLog(String.format("CREATE SCHEMA %s", schemaName));
+                sourceExecuteWithLog(String.format("CREATE SCHEMA %s", schemaName));
             } catch (final BadSqlGrammarException ex) {
                 // only used for native mode.
                 if (ex.getCause() instanceof PSQLException && "42P06".equals(((PSQLException) ex.getCause()).getSQLState())) {
@@ -286,78 +259,90 @@ public abstract class BaseITCase {
         }
     }
     
-    protected void executeWithLog(final Connection connection, final String sql) throws SQLException {
-        log.info("connection execute:{}", sql);
-        connection.createStatement().execute(sql);
-        ThreadUtil.sleep(1, TimeUnit.SECONDS);
+    @SneakyThrows(SQLException.class)
+    protected void sourceExecuteWithLog(final String sql) {
+        log.info("source execute :{}", sql);
+        try (Connection connection = sourceDataSource.getConnection()) {
+            connection.createStatement().execute(sql);
+        }
     }
     
-    private void executeWithLog(final String sql, final int sleepSeconds) {
-        log.info("jdbcTemplate execute:{}", sql);
-        jdbcTemplate.execute(sql);
+    @SneakyThrows(SQLException.class)
+    protected void proxyExecuteWithLog(final String sql, final int sleepSeconds) {
+        log.info("proxy execute :{}", sql);
+        try (Connection connection = proxyDataSource.getConnection()) {
+            connection.createStatement().execute(sql);
+        }
         ThreadUtil.sleep(Math.max(sleepSeconds, 0), TimeUnit.SECONDS);
     }
     
-    protected void executeWithLog(final String sql) {
-        executeWithLog(sql, 2);
+    protected void connectionExecuteWithLog(final Connection connection, final String sql) throws SQLException {
+        log.info("connection execute:{}", sql);
+        connection.createStatement().execute(sql);
+        ThreadUtil.sleep(2, TimeUnit.SECONDS);
     }
     
     protected List<Map<String, Object>> queryForListWithLog(final String sql) {
         int retryNumber = 0;
         while (retryNumber <= 3) {
-            try {
-                return jdbcTemplate.queryForList(sql);
-            } catch (final DataAccessException ex) {
+            try (Connection connection = proxyDataSource.getConnection()) {
+                ResultSet resultSet = connection.createStatement().executeQuery(sql);
+                return resultSetToList(resultSet);
+            } catch (final SQLException ex) {
                 log.error("data access error", ex);
             }
-            ThreadUtil.sleep(2, TimeUnit.SECONDS);
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
             retryNumber++;
         }
         throw new RuntimeException("can't get result from proxy");
     }
     
+    protected List<Map<String, Object>> resultSetToList(final ResultSet rs) throws SQLException {
+        ResultSetMetaData md = rs.getMetaData();
+        int columns = md.getColumnCount();
+        List<Map<String, Object>> results = new ArrayList<>();
+        while (rs.next()) {
+            Map<String, Object> row = new HashMap<>();
+            for (int i = 1; i <= columns; i++) {
+                row.put(md.getColumnLabel(i).toLowerCase(), rs.getObject(i));
+            }
+            results.add(row);
+        }
+        return results;
+    }
+    
     protected void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
         setIncreaseTaskThread(new Thread(baseIncrementTask));
         getIncreaseTaskThread().start();
     }
     
-    protected void stopScalingSourceWriting(final String jobId) {
-        executeWithLog(String.format("STOP MIGRATION SOURCE WRITING '%s'", jobId));
-    }
-    
-    protected void stopScaling(final String jobId) {
-        executeWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
-    }
-    
-    protected void startScaling(final String jobId) {
-        executeWithLog(String.format("START MIGRATION '%s'", jobId), 10);
+    protected void stopMigration(final String jobId) {
+        proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
     }
     
-    protected void applyScaling(final String jobId) {
-        assertBeforeApplyScalingMetadataCorrectly();
-        executeWithLog(String.format("APPLY MIGRATION '%s'", jobId));
+    // TODO reopen later
+    protected void startMigrationByJob(final String jobId) {
+        proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 10);
     }
     
-    protected void assertBeforeApplyScalingMetadataCorrectly() {
-        List<Map<String, Object>> previewResults = queryForListWithLog("PREVIEW SELECT COUNT(1) FROM t_order");
-        assertThat("data_source_name name not correct, it's effective early, search watcher failed get more info",
-                previewResults.stream().map(each -> each.get("data_source_name")).collect(Collectors.toSet()), is(new HashSet<>(Arrays.asList("ds_0", "ds_1"))));
+    protected List<String> listJobId() {
+        List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+        return jobList.stream().map(a -> a.get("id").toString()).collect(Collectors.toList());
     }
     
-    protected String getScalingJobId() {
-        List<Map<String, Object>> scalingListMap = queryForListWithLog("SHOW MIGRATION LIST");
-        String jobId = scalingListMap.get(0).get("id").toString();
-        log.info("jobId: {}", jobId);
-        return jobId;
+    protected String getJobIdByTableName(final String tableName) {
+        List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+        return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find target table")).get("id").toString();
     }
     
-    protected void waitScalingFinished(final String jobId) throws InterruptedException {
+    @SneakyThrows(InterruptedException.class)
+    protected void waitMigrationFinished(final String jobId) {
         if (null != increaseTaskThread) {
             TimeUnit.SECONDS.timedJoin(increaseTaskThread, 60);
         }
         log.info("jobId: {}", jobId);
-        Set<String> actualStatus = null;
-        for (int i = 0; i < 20; i++) {
+        Set<String> actualStatus;
+        for (int i = 0; i < 10; i++) {
             List<Map<String, Object>> showScalingStatusResult = showScalingStatus(jobId);
             log.info("show migration status result: {}", showScalingStatusResult);
             actualStatus = showScalingStatusResult.stream().map(each -> each.get("status").toString()).collect(Collectors.toSet());
@@ -368,14 +353,13 @@ public abstract class BaseITCase {
             } else if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
                 log.warn("one of the shardingItem was not started correctly");
             }
-            ThreadUtil.sleep(2, TimeUnit.SECONDS);
+            ThreadUtil.sleep(3, TimeUnit.SECONDS);
         }
-        assertThat(actualStatus, is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
     }
     
     protected void assertGreaterThanInitTableInitRows(final int tableInitRows, final String schema) {
         String countSQL = StringUtils.isBlank(schema) ? "SELECT COUNT(*) as count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", schema);
-        Map<String, Object> actual = jdbcTemplate.queryForMap(countSQL);
+        Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
         assertTrue("actual count " + actual.get("count"), Integer.parseInt(actual.get("count").toString()) > tableInitRows);
     }
     
@@ -392,7 +376,7 @@ public abstract class BaseITCase {
         }
         boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
         log.info("second check job result: {}", secondCheckJobResult);
-        stopScalingSourceWriting(jobId);
+        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
         List<Map<String, Object>> checkScalingResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='DATA_MATCH')", jobId));
         log.info("checkScalingResults: {}", checkScalingResults);
         for (Map<String, Object> entry : checkScalingResults) {
@@ -408,17 +392,10 @@ public abstract class BaseITCase {
                 return false;
             }
             int incrementalIdleSeconds = Integer.parseInt(entry.get("incremental_idle_seconds").toString());
-            if (incrementalIdleSeconds <= 3) {
+            if (incrementalIdleSeconds < 10) {
                 return false;
             }
         }
         return true;
     }
-    
-    protected void assertPreviewTableSuccess(final String tableName, final List<String> expect) {
-        List<Map<String, Object>> actualResults = queryForListWithLog(String.format("PREVIEW SELECT COUNT(1) FROM %s", tableName));
-        List<String> dataSourceNames = actualResults.stream().map(each -> String.valueOf(each.get("data_source_name"))).sorted().collect(Collectors.toList());
-        Collections.sort(expect);
-        assertThat(dataSourceNames, is(expect));
-    }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
similarity index 52%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
index 6cdbad8abf2..18a6ed2caa1 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
@@ -27,35 +27,23 @@ import javax.xml.bind.annotation.XmlRootElement;
 @Data
 @XmlRootElement(name = "command")
 @XmlAccessorType(XmlAccessType.FIELD)
-public final class CommonSQLCommand {
+public final class MigrationDistSQLCommand {
     
-    @XmlElement(name = "create-database-sharding-algorithm")
-    private String createDatabaseShardingAlgorithm;
+    @XmlElement(name = "create-target-order-table-rule")
+    private String createTargetOrderTableRule;
     
-    @XmlElement(name = "create-order-sharding-algorithm")
-    private String createOrderShardingAlgorithm;
+    @XmlElement(name = "create-target-order-item-table-rule")
+    private String createTargetOrderItemTableRule;
     
-    @XmlElement(name = "create-order-item-sharding-algorithm")
-    private String createOrderItemShardingAlgorithm;
+    @XmlElement(name = "add-migration-source-resource-template")
+    private String addMigrationSourceResourceTemplate;
     
-    @XmlElement(name = "create-order-table-rule")
-    private String createOrderTableRule;
+    @XmlElement(name = "add-migration-target-resource-template")
+    private String addMigrationTargetResourceTemplate;
     
-    @XmlElement(name = "create-order-item-table-rule")
-    private String createOrderItemTableRule;
+    @XmlElement(name = "migration-order-single-table")
+    private String migrationOrderSingleTable;
     
-    @XmlElement(name = "alter-sharding-algorithm")
-    private String alterShardingAlgorithm;
-    
-    @XmlElement(name = "alter-order-with-item-auto-table-rule")
-    private String alterOrderWithItemAutoTableRule;
-    
-    @XmlElement(name = "alter-order-auto-table-rule")
-    private String alterOrderAutoTableRule;
-    
-    @XmlElement(name = "source-add-resource-template")
-    private String sourceAddResourceTemplate;
-    
-    @XmlElement(name = "target-add-resource-template")
-    private String targetAddResourceTemplate;
+    @XmlElement(name = "migration-order-item-single-table")
+    private String migrationOrderItemSingleTable;
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
similarity index 68%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index b2dbaf06509..d7a8fa4f61e 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -26,28 +26,27 @@ import org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEn
 import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
+import org.springframework.jdbc.core.JdbcTemplate;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.junit.Assert.assertTrue;
-
 /**
  * General scaling test case, includes multiple cases.
  */
 @Slf4j
 @RunWith(Parameterized.class)
-public final class MySQLGeneralScalingIT extends BaseExtraSQLITCase {
+public final class MySQLMigrationGeneralIT extends BaseExtraSQLITCase {
     
     private final ScalingParameterized parameterized;
     
-    public MySQLGeneralScalingIT(final ScalingParameterized parameterized) {
+    public MySQLMigrationGeneralIT(final ScalingParameterized parameterized) {
         super(parameterized);
         this.parameterized = parameterized;
         log.info("parameterized:{}", parameterized);
@@ -67,35 +66,39 @@ public final class MySQLGeneralScalingIT extends BaseExtraSQLITCase {
     }
     
     @Test
-    public void assertManualScalingSuccess() throws InterruptedException {
-        addSourceResource();
-        initShardingAlgorithm();
-        assertTrue(waitShardingAlgorithmEffect(15));
+    public void assertMigrationSuccess() {
         createScalingRule();
-        createOrderTableRule();
-        createOrderItemTableRule();
-        createNoUseTable();
-        createOrderTable();
-        createOrderItemTable();
+        createSourceOrderTable();
+        createSourceOrderItemTable();
+        addSourceResource();
+        addTargetResource();
+        createTargetOrderTableRule();
+        createTargetOrderItemTableRule();
         SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
         for (int i = 0; i < TABLE_INIT_ROW_COUNT / 1000; i++) {
             Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), 1000);
-            getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
-            getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+            jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
+            jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
         }
-        addTargetResource();
-        startIncrementTask(new MySQLIncrementTask(getJdbcTemplate(), keyGenerateAlgorithm, true, 20));
-        executeWithLog(getCommonSQLCommand().getAlterOrderWithItemAutoTableRule());
-        String jobId = getScalingJobId();
-        waitScalingFinished(jobId);
-        stopScaling(jobId);
-        getJdbcTemplate().update("INSERT INTO t_order (id,order_id,user_id,status,t_json) VALUES (?, ?, ?, ?, ?)", keyGenerateAlgorithm.generateKey(), keyGenerateAlgorithm.generateKey(),
-                1, "afterStopScaling", "{}");
-        startScaling(jobId);
+        startMigrationOrderItem();
+        checkOrderMigration(keyGenerateAlgorithm, jdbcTemplate);
+        checkOrderItemMigration();
+    }
+    
+    private void checkOrderMigration(final KeyGenerateAlgorithm keyGenerateAlgorithm, final JdbcTemplate jdbcTemplate) {
+        startMigrationOrder();
+        startIncrementTask(new MySQLIncrementTask(jdbcTemplate, keyGenerateAlgorithm, true, 20));
+        String jobId = getJobIdByTableName("t_order");
+        waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
         assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "");
-        applyScaling(jobId);
-        assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", "ds_4"));
-        assertPreviewTableSuccess("t_order_item", Arrays.asList("ds_2", "ds_3", "ds_4"));
+    }
+    
+    private void checkOrderItemMigration() {
+        startMigrationOrderItem();
+        String jobId = getJobIdByTableName("t_order_item");
+        waitMigrationFinished(jobId);
+        assertCheckScalingSuccess(jobId);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
similarity index 67%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index c4358a615ef..6cd69231d09 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -31,24 +31,22 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
+import org.springframework.jdbc.core.JdbcTemplate;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.junit.Assert.assertTrue;
-
 /**
  * PostgreSQL general scaling test case. include openGauss type, same process.
  */
 @Slf4j
 @RunWith(Parameterized.class)
-public final class PostgreSQLGeneralScalingIT extends BaseExtraSQLITCase {
+public final class PostgreSQLMigrationGeneralIT extends BaseExtraSQLITCase {
     
     private final ScalingParameterized parameterized;
     
-    public PostgreSQLGeneralScalingIT(final ScalingParameterized parameterized) {
+    public PostgreSQLMigrationGeneralIT(final ScalingParameterized parameterized) {
         super(parameterized);
         this.parameterized = parameterized;
         log.info("parameterized:{}", parameterized);
@@ -70,36 +68,39 @@ public final class PostgreSQLGeneralScalingIT extends BaseExtraSQLITCase {
     }
     
     @Test
-    public void assertManualScalingSuccess() throws InterruptedException {
-        addSourceResource();
-        initShardingAlgorithm();
-        assertTrue(waitShardingAlgorithmEffect(15));
+    public void assertMigrationSuccess() {
         createScalingRule();
-        createSchema("test");
-        createOrderTableRule();
-        createOrderItemTableRule();
-        createOrderTable();
-        createOrderItemTable();
-        // TODO wait kernel support create index if not exists
-        // createTableIndexList("test");
-        executeWithLog("COMMENT ON COLUMN test.t_order.user_id IS 'user id';");
+        createSourceSchema("test");
+        createSourceOrderTable();
+        createSourceOrderItemTable();
+        createSourceTableIndexList("test");
+        createSourceCommentOnList("test");
+        addSourceResource();
+        addTargetResource();
+        createTargetOrderTableRule();
+        createTargetOrderItemTableRule();
         SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
         Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
-        getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
-        getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
-        addTargetResource();
-        startIncrementTask(new PostgreSQLIncrementTask(getJdbcTemplate(), new SnowflakeKeyGenerateAlgorithm(), "test", true, 20));
-        executeWithLog(getCommonSQLCommand().getAlterOrderWithItemAutoTableRule());
-        String jobId = getScalingJobId();
-        waitScalingFinished(jobId);
-        stopScaling(jobId);
-        executeWithLog(String.format("INSERT INTO test.t_order (id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", keyGenerateAlgorithm.generateKey(), System.currentTimeMillis(),
-                1, "afterStopScaling"));
-        startScaling(jobId);
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+        jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
+        jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+        checkOrderMigration(jdbcTemplate);
+        checkOrderItemMigration();
+    }
+    
+    private void checkOrderMigration(final JdbcTemplate jdbcTemplate) {
+        startMigrationOrder();
+        startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, "test", false, 20));
+        String jobId = getJobIdByTableName("t_order");
+        waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
         assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "test");
-        applyScaling(jobId);
-        assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", "ds_4"));
-        assertPreviewTableSuccess("t_order_item", Arrays.asList("ds_2", "ds_3", "ds_4"));
+    }
+    
+    private void checkOrderItemMigration() {
+        startMigrationOrderItem();
+        String jobId = getJobIdByTableName("t_order_item");
+        waitMigrationFinished(jobId);
+        assertCheckScalingSuccess(jobId);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
similarity index 71%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyScalingIT.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index 416f462d6f9..fbe0c01c3ce 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -30,20 +30,18 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.junit.Assert.assertTrue;
-
 @RunWith(Parameterized.class)
 @Slf4j
-public class TextPrimaryKeyScalingIT extends BaseExtraSQLITCase {
+public class TextPrimaryKeyMigrationIT extends BaseExtraSQLITCase {
     
-    public TextPrimaryKeyScalingIT(final ScalingParameterized parameterized) {
+    public TextPrimaryKeyMigrationIT(final ScalingParameterized parameterized) {
         super(parameterized);
         log.info("parameterized:{}", parameterized);
     }
@@ -67,29 +65,32 @@ public class TextPrimaryKeyScalingIT extends BaseExtraSQLITCase {
     }
     
     @Test
-    public void assertTextPrimaryKeyScalingSuccess() throws InterruptedException {
-        addSourceResource();
-        initShardingAlgorithm();
-        assertTrue(waitShardingAlgorithmEffect(15));
-        createScalingRule();
-        createOrderTableRule();
-        createOrderTable();
+    public void assertTextPrimaryMigrationSuccess() throws SQLException {
+        createSourceOrderTable();
         batchInsertOrder();
+        createScalingRule();
+        addSourceResource();
         addTargetResource();
-        executeWithLog(getCommonSQLCommand().getAlterOrderAutoTableRule());
-        String jobId = getScalingJobId();
-        waitScalingFinished(jobId);
+        createTargetOrderTableRule();
+        startMigrationOrder();
+        String jobId = listJobId().get(0);
+        waitMigrationFinished(jobId);
+        stopMigration(jobId);
         assertCheckScalingSuccess(jobId);
-        applyScaling(jobId);
-        assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", "ds_4"));
     }
     
-    private void batchInsertOrder() {
+    private void batchInsertOrder() throws SQLException {
         UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
-        List<Object[]> orderData = new ArrayList<>(TABLE_INIT_ROW_COUNT);
-        for (int i = 0; i < TABLE_INIT_ROW_COUNT; i++) {
-            orderData.add(new Object[]{keyGenerateAlgorithm.generateKey(), ThreadLocalRandom.current().nextInt(0, 6), ThreadLocalRandom.current().nextInt(0, 6), "OK"});
+        try (Connection connection = getSourceDataSource().getConnection()) {
+            PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?,?,?,?)");
+            for (int i = 0; i < TABLE_INIT_ROW_COUNT; i++) {
+                preparedStatement.setObject(1, keyGenerateAlgorithm.generateKey());
+                preparedStatement.setObject(2, ThreadLocalRandom.current().nextInt(0, 6));
+                preparedStatement.setObject(3, ThreadLocalRandom.current().nextInt(0, 6));
+                preparedStatement.setObject(4, "OK");
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
         }
-        getJdbcTemplate().batchUpdate("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?,?,?,?)", orderData);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
index f6faace0b69..14bebdc5c5f 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -79,7 +79,7 @@ public final class MySQLIncrementTask extends BaseIncrementTask {
     private void updateOrderByPrimaryKey(final Object primaryKey) {
         Object[] updateData = {"updated" + Instant.now().getEpochSecond(), ThreadLocalRandom.current().nextInt(0, 100), primaryKey};
         jdbcTemplate.update("UPDATE t_order SET status = ?,t_unsigned_int = ? WHERE id = ?", updateData);
-        jdbcTemplate.update("UPDATE t_order SET status = null,t_unsigned_int = 299,t_timestamp='0000-00-00 00:00:00' WHERE id = ?", primaryKey);
+        jdbcTemplate.update("UPDATE t_order SET status = null,t_unsigned_int = 299,t_datetime='0000-00-00 00:00:00' WHERE id = ?", primaryKey);
     }
     
     private void setNullToOrderFields(final Object primaryKey) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index ff22ce7f4cd..352e5b00cc0 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -22,19 +22,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseIncrementTask;
 import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.time.Instant;
+import java.util.Properties;
 import java.util.concurrent.ThreadLocalRandom;
 
 @Slf4j
 @RequiredArgsConstructor
 public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     
-    private final JdbcTemplate jdbcTemplate;
+    private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new SnowflakeKeyGenerateAlgorithm();
     
-    private final KeyGenerateAlgorithm keyGenerateAlgorithm;
+    private final JdbcTemplate jdbcTemplate;
     
     private final String schema;
     
@@ -42,6 +44,12 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     
     private final int executeCountLimit;
     
+    static {
+        Properties props = new Properties();
+        props.setProperty("max-vibration-offset", "2");
+        KEY_GENERATE_ALGORITHM.init(props);
+    }
+    
     @Override
     public void run() {
         int executeCount = 0;
@@ -65,7 +73,7 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     private Object insertOrder() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
-        Object[] orderInsertDate = new Object[]{keyGenerateAlgorithm.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
+        Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
         jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order (id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
         return orderInsertDate[0];
     }
@@ -73,7 +81,7 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     private Object insertOrderItem() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
-        Object[] orderInsertItemDate = new Object[]{keyGenerateAlgorithm.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
+        Object[] orderInsertItemDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
         jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)", schema), orderInsertItemDate);
         return orderInsertItemDate[0];
     }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
similarity index 95%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
index 79cc4b7bc39..95c505b9484 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
@@ -33,7 +33,7 @@ import org.apache.shardingsphere.test.integration.env.runtime.DataSourceEnvironm
 /**
  * Composed container, include governance container and database container.
  */
-public final class DockerComposedContainer extends BaseComposedContainer {
+public final class MigrationComposedContainer extends BaseComposedContainer {
     
     private final DatabaseType databaseType;
     
@@ -45,7 +45,7 @@ public final class DockerComposedContainer extends BaseComposedContainer {
     @Getter
     private final GovernanceContainer governanceContainer;
     
-    public DockerComposedContainer(final DatabaseType databaseType, final String dockerImageName) {
+    public MigrationComposedContainer(final DatabaseType databaseType, final String dockerImageName) {
         this.databaseType = databaseType;
         governanceContainer = getContainers().registerContainer(new ZookeeperContainer());
         storageContainer = getContainers().registerContainer((DockerStorageContainer) StorageContainerFactory.newInstance(databaseType, dockerImageName,
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index 59e7f49af11..a87773aad14 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -21,14 +21,13 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
+import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
 import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.repository.cluster.zookeeper.CuratorZookeeperRepository;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
-import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -41,8 +40,6 @@ public class ScalingWatcher extends TestWatcher {
     
     private final BaseComposedContainer composedContainer;
     
-    private final JdbcTemplate jdbcTemplate;
-    
     @Override
     protected void failed(final Throwable e, final Description description) {
         if (composedContainer instanceof NativeComposedContainer) {
@@ -50,17 +47,14 @@ public class ScalingWatcher extends TestWatcher {
             return;
         }
         outputZookeeperData();
-        List<Map<String, Object>> previewList = jdbcTemplate.queryForList("preview select * from t_order");
-        List<Map<String, Object>> shardingAlgorithms = jdbcTemplate.queryForList("SHOW SHARDING ALGORITHMS");
-        log.warn("watcher failed, preview:{}, shardingAlgorithms:{}", previewList, shardingAlgorithms);
     }
     
     private void outputZookeeperData() {
-        DockerComposedContainer dockerComposedContainer = (DockerComposedContainer) composedContainer;
-        DatabaseType databaseType = dockerComposedContainer.getStorageContainer().getDatabaseType();
+        MigrationComposedContainer migrationComposedContainer = (MigrationComposedContainer) composedContainer;
+        DatabaseType databaseType = migrationComposedContainer.getStorageContainer().getDatabaseType();
         String namespace = "it_db_" + databaseType.getType().toLowerCase();
         ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration("ZooKeeper", namespace,
-                dockerComposedContainer.getGovernanceContainer().getServerLists(), new Properties());
+                migrationComposedContainer.getGovernanceContainer().getServerLists(), new Properties());
         ClusterPersistRepository zookeeperRepository = new CuratorZookeeperRepository();
         zookeeperRepository.init(config);
         List<String> childrenKeys = zookeeperRepository.getChildrenKeys("/");
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
index 4b2d51b76ae..1ba93faaaf2 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
@@ -16,80 +16,15 @@
   -->
 
 <command>
-    <create-database-sharding-algorithm>
-        CREATE SHARDING ALGORITHM database_inline (
-        TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="ds_${user_id % 2}")))
-    </create-database-sharding-algorithm>
-    
-    <create-order-sharding-algorithm>
-        CREATE SHARDING ALGORITHM t_order_inline (
-        TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="t_order_${order_id % 2}")))
-    </create-order-sharding-algorithm>
-    
-    <create-order-item-sharding-algorithm>
-        CREATE SHARDING ALGORITHM t_order_item_inline (
-        TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="t_order_item_${order_id % 2}")))
-    </create-order-item-sharding-algorithm>
-    
-    <create-order-table-rule>
-        CREATE SHARDING TABLE RULE t_order(
-        RESOURCES(ds_0,ds_1),
-        SHARDING_COLUMN=order_id,
-        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="4")),
-        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
-        )
-    </create-order-table-rule>
-    
-    <create-order-item-table-rule>
-        CREATE SHARDING TABLE RULE t_order_item(
-        RESOURCES(ds_0,ds_1),
-        SHARDING_COLUMN=order_id,
-        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="4")),
-        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
-        );
-    </create-order-item-table-rule>
-    
-    <alter-sharding-algorithm>
-        ALTER SHARDING ALGORITHM database_inline
-        (TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="ds_${user_id % 3 + 2}")))
-    </alter-sharding-algorithm>
-    
-    <alter-order-with-item-auto-table-rule>
-        ALTER SHARDING TABLE RULE t_order(
-        RESOURCES(ds_2, ds_3, ds_4),
-        SHARDING_COLUMN=order_id,
-        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
-        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
-        ),t_order_item(
-        RESOURCES(ds_2, ds_3, ds_4),
-        SHARDING_COLUMN=order_id,
-        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
-        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
-        )
-    </alter-order-with-item-auto-table-rule>
-    
-    <alter-order-auto-table-rule>
-        ALTER SHARDING TABLE RULE t_order(
-        RESOURCES(ds_2, ds_3, ds_4),
-        SHARDING_COLUMN=order_id,
-        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
-        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
-        )
-    </alter-order-auto-table-rule>
-    
-    <source-add-resource-template>
-        ADD RESOURCE ds_0 (
+    <add-migration-source-resource-template>
+        ADD MIGRATION SOURCE RESOURCE ds_0 (
         URL="${ds0}",
         USER="${user}",
         PASSWORD="${password}"
-        ), ds_1 (
-        URL="${ds1}",
-        USER="${user}",
-        PASSWORD="${password}"
-        )
-    </source-add-resource-template>
+        );
+    </add-migration-source-resource-template>
     
-    <target-add-resource-template>
+    <add-migration-target-resource-template>
         ADD RESOURCE ds_2 (
         URL="${ds2}",
         USER="${user}",
@@ -103,5 +38,31 @@
         USER="${user}",
         PASSWORD="${password}"
         )
-    </target-add-resource-template>
+    </add-migration-target-resource-template>
+    
+    <create-target-order-table-rule>
+        CREATE SHARDING TABLE RULE t_order(
+        RESOURCES(ds_2,ds_3,ds_4),
+        SHARDING_COLUMN=order_id,
+        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
+        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+        )
+    </create-target-order-table-rule>
+    
+    <create-target-order-item-table-rule>
+        CREATE SHARDING TABLE RULE t_order_item(
+        RESOURCES(ds_2,ds_3,ds_4),
+        SHARDING_COLUMN=order_id,
+        TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
+        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+        );
+    </create-target-order-item-table-rule>
+    
+    <migration-order-single-table>
+        MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;
+    </migration-order-single-table>
+    
+    <migration-order-item-single-table>
+        MIGRATE TABLE ds_0.t_order_item INTO sharding_db.t_order_item;
+    </migration-order-item-single-table>
 </command>
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
index 51baa22fcda..13e3417ebb2 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
@@ -26,6 +26,7 @@
     <logger name="org.apache.shardingsphere.test.integration.env.container.atomic.DockerITContainer" level="WARN" />
     <logger name="com.zaxxer.hikari.pool.ProxyConnection" level="OFF" />
     <logger name="com.github.dockerjava" level="WARN"/>
+    <logger name="org.apache.zookeeper.ZooKeeper" level="OFF"/>
     
     <root level="INFO">
         <appender-ref ref="console" />
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 2797c0d23b1..9fbaeaf1a07 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -219,10 +219,9 @@ public final class MigrationJobAPIImplTest {
         jobAPI.persistJobItemProgress(jobItemContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
         jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
-        jobAPI.switchClusterConfiguration(jobId.get());
         Map<Integer, InventoryIncrementalJobItemProgress> progress = jobAPI.getJobProgress(jobId.get());
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : progress.entrySet()) {
-            assertSame(entry.getValue().getStatus(), JobStatus.FINISHED);
+            assertSame(entry.getValue().getStatus(), JobStatus.EXECUTE_INVENTORY_TASK);
         }
     }
     
@@ -251,8 +250,8 @@ public final class MigrationJobAPIImplTest {
                 Connection connection = pipelineDataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
-            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT(11))");
+            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 0), (999, 15)");
         }
     }
     
@@ -280,4 +279,10 @@ public final class MigrationJobAPIImplTest {
         Map<String, DataSourceProperties> actual = persistService.load(JobType.MIGRATION);
         assertTrue(actual.containsKey("ds_0"));
     }
+    
+    @Test
+    public void assertCreateJobConfig() {
+        // CreateMigrationJobParameter parameter = new CreateMigrationJobParameter();
+        // jobAPI.createJobConfig(parameter);
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 4adbd5ddff9..2f0e02f975a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -64,8 +64,8 @@ public final class DataConsistencyCheckerTest {
                 Connection connection = new DefaultPipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
-            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT(11))");
+            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 1), (999, 10)");
         }
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java
index 04b6d27095a..46622357323 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java
@@ -25,7 +25,7 @@ import javax.sql.DataSource;
 public final class FixturePositionInitializer implements PositionInitializer {
     
     @Override
-    public PlaceholderPosition init(final DataSource dataSource) {
+    public PlaceholderPosition init(final DataSource dataSource, final String slotNameSuffix) {
         return new PlaceholderPosition();
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index fd0d8e0ee05..e5818fb9667 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -57,7 +57,7 @@ public final class IncrementalTaskTest {
     @Test
     public void assertStart() {
         incrementalTask.start();
-        assertThat(incrementalTask.getTaskId(), is("ds_0"));
+        assertThat(incrementalTask.getTaskId(), is("standard_0"));
         assertThat(incrementalTask.getTaskProgress().getPosition(), instanceOf(PlaceholderPosition.class));
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 3c8cd165b79..a06e5a3b8f6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -28,10 +28,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
-
-import java.util.Collections;
-import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Job configuration builder.
@@ -46,15 +42,14 @@ public final class JobConfigurationBuilder {
      */
     public static MigrationJobConfiguration createJobConfiguration() {
         YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
-        result.setDatabaseName("logic_db");
-        result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")));
-        int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
-        result.setActiveVersion(activeVersion);
-        result.setNewVersion(activeVersion + 1);
+        result.setTargetDatabaseName("logic_db");
+        result.setSourceDataSourceName("standard_0");
         // TODO add autoTables in config file
-        result.setSource(createYamlPipelineDataSourceConfiguration(
-                new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
-        result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
+        result.setSource(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("migration_standard_jdbc_source.yaml"))));
+        result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
+                ConfigurationFileUtil.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
+        result.setSourceTableName("t_order");
+        result.setTargetTableName("t_order");
         PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
         return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_sharding_sphere_jdbc_target.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_sharding_sphere_jdbc_target.yaml
new file mode 100644
index 00000000000..784ee92542b
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_sharding_sphere_jdbc_target.yaml
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+dataSources:
+  ds_1:
+    dataSourceClassName: com.zaxxer.hikari.HikariDataSource
+    jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+    username: root
+    password: root
+  ds_2:
+    dataSourceClassName: com.zaxxer.hikari.HikariDataSource
+    jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+    username: root
+    password: root
+rules:
+  - !SHARDING
+    defaultDatabaseStrategy:
+      standard:
+        shardingAlgorithmName: default_db_inline
+        shardingColumn: user_id
+    tables:
+      t_order:
+        actualDataNodes: ds_$->{1..2}.t_order_$->{0..1}
+        keyGenerateStrategy:
+          column: order_id
+          keyGeneratorName: snowflake
+        tableStrategy:
+          standard:
+            shardingAlgorithmName: t_order_tbl_inline
+            shardingColumn: order_id
+    shardingAlgorithms:
+      default_db_inline:
+        type: INLINE
+        props:
+          algorithm-expression: ds_$->{user_id % 2 + 1}
+      t_order_tbl_inline:
+        type: INLINE
+        props:
+          algorithm-expression: t_order_$->{order_id % 2}
+    
+    keyGenerators:
+      snowflake:
+        type: SNOWFLAKE
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_standard_jdbc_source.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_standard_jdbc_source.yaml
new file mode 100644
index 00000000000..ff51c87dc8e
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_standard_jdbc_source.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+jdbcUrl: jdbc:h2:mem:standard_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+username: root
+password: root
+dataSourceClassName: com.zaxxer.hikari.HikariDataSource
+minimumIdle: 1
+minPoolSize: 1
+maxPoolSize: 50
+maximumPoolSize: 50
+readOnly: false
+idleTimeout: 60000
+connectionTimeout: 30000
+maxLifetime: 1800000