You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/29 10:12:09 UTC

[shardingsphere] branch master updated: Refactor JobConfiguration, prepare for different types of jobs reuse and extension (#17205)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f16c9dc0261 Refactor JobConfiguration, prepare for different types of jobs reuse and extension (#17205)
f16c9dc0261 is described below

commit f16c9dc0261f50e40c31966b13c42c6bb342934d
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Apr 29 18:11:54 2022 +0800

    Refactor JobConfiguration, prepare for different types of jobs reuse and extension (#17205)
    
    * Rename JobConfiguration to RuleAlteredJobConfiguration
    
    * Rename jobConfiguration to jobConfig
    
    * Refactor databaseName from workflowConfig to jobConfig
    
    * Refactor jobId from handleConfig to jobConfig
    
    * Refactor jobShardingItem from handleConfig to jobConfig
    
    * Refactor workflowConfig into jobConfig
    
    * Refactor pipelineConfig into jobConfig
    
    * Refactor handleConfig into jobConfig
---
 ...hardingRuleAlteredJobConfigurationPreparer.java |  52 +++---
 ...ingRuleAlteredJobConfigurationPreparerTest.java |  98 -----------
 .../yaml/scaling/prepare/alter_rule_source.yaml    |  88 ----------
 .../prepare/auto_table_alter_rule_target.yaml      |  94 -----------
 .../scaling/prepare/table_alter_rule_target.yaml   |  94 -----------
 .../data/pipeline/api/RuleAlteredJobAPI.java       |  14 +-
 .../PipelineJobConfiguration.java}                 |  35 ++--
 .../config/rulealtered/HandleConfiguration.java    |  94 -----------
 .../api/config/rulealtered/JobConfiguration.java   |  97 -----------
 .../config/rulealtered/PipelineConfiguration.java  |  59 -------
 .../rulealtered/RuleAlteredJobConfiguration.java   | 184 +++++++++++++++++++++
 .../api/config/rulealtered/TaskConfiguration.java  |   2 +-
 .../config/rulealtered/WorkflowConfiguration.java  |  53 ------
 .../RuleAlteredJobConfigurationPreparer.java       |  17 +-
 ...t.java => RuleAlteredJobConfigurationTest.java} |  30 +---
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |   2 +-
 .../core/api/impl/RuleAlteredJobAPIImpl.java       |  78 +++++----
 .../check/consistency/DataConsistencyChecker.java  |  30 ++--
 .../pipeline/core/execute/PipelineJobExecutor.java |  13 +-
 .../data/pipeline/core/job/FinishedCheckJob.java   |  14 +-
 .../datasource/AbstractDataSourcePreparer.java     |  10 +-
 .../datasource/PrepareTargetTablesParameter.java   |   4 +-
 .../scenario/rulealtered/RuleAlteredJob.java       |   8 +-
 .../rulealtered/RuleAlteredJobContext.java         |  12 +-
 .../rulealtered/RuleAlteredJobPreparer.java        |  30 ++--
 .../rulealtered/RuleAlteredJobScheduler.java       |   4 +-
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  61 +++----
 .../rulealtered/prepare/InventoryTaskSplitter.java |   8 +-
 .../job/environment/ScalingEnvironmentManager.java |  10 +-
 .../datasource/MySQLDataSourcePreparer.java        |   8 +-
 .../datasource/MySQLDataSourcePreparerTest.java    |  18 +-
 .../datasource/OpenGaussDataSourcePreparer.java    |   4 +-
 .../datasource/PostgreSQLDataSourcePreparer.java   |   8 +-
 .../data/pipeline/env/ITEnvironmentContext.java    |  11 +-
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   |  43 +++--
 .../consistency/DataConsistencyCheckerTest.java    |   4 +-
 .../datasource/PipelineDataSourceManagerTest.java  |  10 +-
 .../core/util/JobConfigurationBuilder.java         |  22 +--
 .../scenario/rulealtered/RuleAlteredJobTest.java   |   8 +-
 .../rulealtered/RuleAlteredJobWorkerTest.java      |  15 +-
 .../prepare/InventoryTaskSplitterTest.java         |   4 +-
 41 files changed, 461 insertions(+), 989 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
index 0c47070e413..cc84a00639c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -22,11 +22,9 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -68,23 +66,21 @@ import java.util.Set;
 public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer {
     
     @Override
-    public HandleConfiguration createHandleConfiguration(final PipelineConfiguration pipelineConfig, final WorkflowConfiguration workflowConfig) {
-        HandleConfiguration result = new HandleConfiguration();
-        Map<String, List<DataNode>> shouldScalingActualDataNodes = getShouldScalingActualDataNodes(pipelineConfig, workflowConfig);
-        result.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes));
-        result.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
-        result.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
-        return result;
+    public void extendJobConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+        Map<String, List<DataNode>> shouldScalingActualDataNodes = getShouldScalingActualDataNodes(jobConfig);
+        jobConfig.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes));
+        jobConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
+        jobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
     }
     
-    private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final PipelineConfiguration pipelineConfig, final WorkflowConfiguration workflowConfig) {
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
+    private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
+        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
         ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
         ShardingRule shardingRule = new ShardingRule(sourceRuleConfig, source.getRootConfig().getDataSources().keySet());
         Map<String, TableRule> tableRules = shardingRule.getTableRules();
         Map<String, List<DataNode>> result = new LinkedHashMap<>();
-        Set<String> reShardNeededTables = new HashSet<>(workflowConfig.getAlteredRuleYamlClassNameTablesMap().get(YamlShardingRuleConfiguration.class.getName()));
+        Set<String> reShardNeededTables = new HashSet<>(jobConfig.getAlteredRuleYamlClassNameTablesMap().get(YamlShardingRuleConfiguration.class.getName()));
         for (Entry<String, TableRule> entry : tableRules.entrySet()) {
             if (reShardNeededTables.contains(entry.getKey())) {
                 result.put(entry.getKey(), entry.getValue().getActualDataNodes());
@@ -129,13 +125,13 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return new JobDataNodeLine(dataNodeEntries).marshal();
     }
     
+    // TODO use jobConfig as parameter, jobShardingItem
     @Override
-    public TaskConfiguration createTaskConfiguration(final PipelineConfiguration pipelineConfig, final HandleConfiguration handleConfig,
-                                                     final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
-        ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(pipelineConfig);
+    public TaskConfiguration createTaskConfiguration(final RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+        ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
         ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
         Map<String, DataSourceProperties> dataSourcePropsMap = new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
-        JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(handleConfig.getJobShardingDataNodes().get(handleConfig.getJobShardingItem()));
+        JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getJobShardingDataNodes().get(jobConfig.getJobShardingItem()));
         String dataSourceName = dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
         Map<String, String> tableMap = new LinkedHashMap<>();
         for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
@@ -144,20 +140,20 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
             }
         }
         DumperConfiguration dumperConfig = createDumperConfiguration(dataSourceName, dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableMap);
-        Optional<ShardingRuleConfiguration> targetRuleConfigOptional = getTargetRuleConfiguration(pipelineConfig);
-        Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfigOptional.orElse(sourceRuleConfig), new HashSet<>(handleConfig.splitLogicTableNames()));
-        ImporterConfiguration importerConfig = createImporterConfiguration(pipelineConfig, handleConfig, onRuleAlteredActionConfig, shardingColumnsMap);
-        TaskConfiguration result = new TaskConfiguration(handleConfig, dumperConfig, importerConfig);
+        Optional<ShardingRuleConfiguration> targetRuleConfigOptional = getTargetRuleConfiguration(jobConfig);
+        Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfigOptional.orElse(sourceRuleConfig), new HashSet<>(jobConfig.splitLogicTableNames()));
+        ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, onRuleAlteredActionConfig, shardingColumnsMap);
+        TaskConfiguration result = new TaskConfiguration(jobConfig, dumperConfig, importerConfig);
         log.info("createTaskConfiguration, dataSourceName={}, result={}", dataSourceName, result);
         return result;
     }
     
-    private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final PipelineConfiguration pipelineConfig) {
-        return (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
+    private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+        return (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
     }
     
-    private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final PipelineConfiguration pipelineConfig) {
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
+    private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
         if (!(targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration)) {
             return Optional.empty();
         }
@@ -207,11 +203,11 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return result;
     }
     
-    private static ImporterConfiguration createImporterConfiguration(final PipelineConfiguration pipelineConfig, final HandleConfiguration handleConfig,
+    private static ImporterConfiguration createImporterConfiguration(final RuleAlteredJobConfiguration jobConfig,
                                                                      final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig, final Map<String, Set<String>> shardingColumnsMap) {
-        PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
+        PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
         int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
-        int retryTimes = handleConfig.getRetryTimes();
+        int retryTimes = jobConfig.getRetryTimes();
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, batchSize, retryTimes);
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparerTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparerTest.java
deleted file mode 100644
index a0109d36bb8..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparerTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.sharding.schedule;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
-import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ShardingRuleAlteredJobConfigurationPreparerTest {
-    
-    @Mock
-    private OnRuleAlteredActionConfiguration mockOnRuleAlteredActionConfiguration;
-    
-    private WorkflowConfiguration workflowConfiguration;
-    
-    @Before
-    public void setUp() {
-        workflowConfiguration = new WorkflowConfiguration("logic_db", ImmutableMap.of(YamlShardingRuleConfiguration.class.getName(),
-                Collections.singletonList("t_order")), 0, 1);
-        when(mockOnRuleAlteredActionConfiguration.getOutput()).thenReturn(new OutputConfiguration(5, 1000, null));
-    }
-    
-    @Test
-    public void assertAutoTableRuleCreateTaskConfiguration() throws IOException {
-        PipelineConfiguration pipelineConfiguration = new PipelineConfiguration();
-        setPipelineConfigurationSource(pipelineConfiguration);
-        URL targetUrl = ShardingRuleAlteredJobConfigurationPreparerTest.class.getClassLoader().getResource("yaml/scaling/prepare/auto_table_alter_rule_target.yaml");
-        assertNotNull(targetUrl);
-        YamlPipelineDataSourceConfiguration target = YamlEngine.unmarshal(new File(targetUrl.getFile()), YamlPipelineDataSourceConfiguration.class);
-        pipelineConfiguration.setTarget(target);
-        JobConfiguration jobConfiguration = new JobConfiguration(workflowConfiguration, pipelineConfiguration);
-        jobConfiguration.buildHandleConfig();
-        ShardingRuleAlteredJobConfigurationPreparer preparer = new ShardingRuleAlteredJobConfigurationPreparer();
-        TaskConfiguration taskConfiguration = preparer.createTaskConfiguration(pipelineConfiguration, jobConfiguration.getHandleConfig(), mockOnRuleAlteredActionConfiguration);
-        assertEquals(taskConfiguration.getHandleConfig().getLogicTables(), "t_order");
-    }
-    
-    @Test
-    public void assertTableRuleCreateTaskConfiguration() throws IOException {
-        PipelineConfiguration pipelineConfiguration = new PipelineConfiguration();
-        setPipelineConfigurationSource(pipelineConfiguration);
-        URL targetUrl = ShardingRuleAlteredJobConfigurationPreparerTest.class.getClassLoader().getResource("yaml/scaling/prepare/table_alter_rule_target.yaml");
-        assertNotNull(targetUrl);
-        YamlPipelineDataSourceConfiguration target = YamlEngine.unmarshal(new File(targetUrl.getFile()), YamlPipelineDataSourceConfiguration.class);
-        pipelineConfiguration.setTarget(target);
-        JobConfiguration jobConfiguration = new JobConfiguration(workflowConfiguration, pipelineConfiguration);
-        jobConfiguration.buildHandleConfig();
-        ShardingRuleAlteredJobConfigurationPreparer preparer = new ShardingRuleAlteredJobConfigurationPreparer();
-        TaskConfiguration taskConfiguration = preparer.createTaskConfiguration(pipelineConfiguration, jobConfiguration.getHandleConfig(), mockOnRuleAlteredActionConfiguration);
-        assertThat(taskConfiguration.getHandleConfig().getLogicTables(), is("t_order"));
-    }
-    
-    private void setPipelineConfigurationSource(final PipelineConfiguration pipelineConfiguration) throws IOException {
-        URL sourceUrl = ShardingRuleAlteredJobConfigurationPreparerTest.class.getClassLoader().getResource("yaml/scaling/prepare/alter_rule_source.yaml");
-        assertNotNull(sourceUrl);
-        YamlPipelineDataSourceConfiguration source = YamlEngine.unmarshal(new File(sourceUrl.getFile()), YamlPipelineDataSourceConfiguration.class);
-        pipelineConfiguration.setSource(source);
-    }
-}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/alter_rule_source.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/alter_rule_source.yaml
deleted file mode 100644
index ba874588719..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/alter_rule_source.yaml
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-parameter: |
-  dataSources:
-    ds_0:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_1:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_2:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_3:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_3;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_4:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_4;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-  rules:
-  - !SHARDING
-    defaultDatabaseStrategy:
-      standard:
-        shardingAlgorithmName: database_inline
-        shardingColumn: user_id
-    defaultTableStrategy:
-      none: ''
-    keyGenerators:
-      snowflake:
-        type: SNOWFLAKE
-    scaling:
-      default_scaling:
-        completionDetector:
-          props:
-            incremental-task-idle-minute-threshold: 30
-          type: IDLE
-        dataConsistencyChecker:
-          props:
-            chunk-size: 1000
-          type: DATA_MATCH
-        input:
-          batchSize: 1000
-          workerThread: 40
-        output:
-          batchSize: 1000
-          workerThread: 40
-        streamChannel:
-          props:
-            block-queue-size: 10000
-          type: MEMORY
-    scalingName: default_scaling
-    shardingAlgorithms:
-      database_inline:
-        props:
-          algorithm-expression: ds_${user_id % 2}
-        type: INLINE
-      t_order_inline:
-        props:
-          algorithm-expression: t_order_${order_id % 2}
-        type: INLINE
-    tables:
-      t_order:
-        actualDataNodes: ds_${0..1}.t_order_${0..1}
-        keyGenerateStrategy:
-          column: order_id
-          keyGeneratorName: snowflake
-        logicTable: t_order
-        tableStrategy:
-          standard:
-            shardingAlgorithmName: t_order_inline
-            shardingColumn: order_id
-  schemaName: sharding_db
-type: ShardingSphereJDBC
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/auto_table_alter_rule_target.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/auto_table_alter_rule_target.yaml
deleted file mode 100644
index b1bfb8e7ebc..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/auto_table_alter_rule_target.yaml
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-parameter: |
-  dataSources:
-    ds_0:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_1:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_2:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_3:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_3;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_4:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_4;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-  rules:
-  - !SHARDING
-    autoTables:
-      t_order:
-        actualDataSources: ds_2,ds_3,ds_4
-        keyGenerateStrategy:
-          column: order_id
-          keyGeneratorName: t_order_snowflake
-        logicTable: t_order
-        shardingStrategy:
-          standard:
-            shardingAlgorithmName: t_order_hash_mod
-            shardingColumn: order_id
-    defaultDatabaseStrategy:
-      standard:
-        shardingAlgorithmName: database_inline
-        shardingColumn: user_id
-    defaultTableStrategy:
-      none: ''
-    keyGenerators:
-      snowflake:
-        type: SNOWFLAKE
-      t_order_snowflake:
-        type: snowflake
-    scaling:
-      default_scaling:
-        completionDetector:
-          props:
-            incremental-task-idle-minute-threshold: 30
-          type: IDLE
-        dataConsistencyChecker:
-          props:
-            chunk-size: 1000
-          type: DATA_MATCH
-        input:
-          batchSize: 1000
-          workerThread: 40
-        output:
-          batchSize: 1000
-          workerThread: 40
-        streamChannel:
-          props:
-            block-queue-size: 10000
-          type: MEMORY
-    scalingName: default_scaling
-    shardingAlgorithms:
-      database_inline:
-        props:
-          algorithm-expression: ds_${user_id % 2}
-        type: INLINE
-      t_order_inline:
-        props:
-          algorithm-expression: t_order_${order_id % 2}
-        type: INLINE
-      t_order_hash_mod:
-        props:
-          sharding-count: '6'
-        type: hash_mod
-  schemaName: sharding_db
-type: ShardingSphereJDBC
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/table_alter_rule_target.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/table_alter_rule_target.yaml
deleted file mode 100644
index 9856cced6ef..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/table_alter_rule_target.yaml
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-parameter: |
-  dataSources:
-    ds_0:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_1:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_2:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_3:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_3;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-    ds_4:
-      dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
-      jdbcUrl: jdbc:h2:mem:test_ds_4;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
-  rules:
-  - !SHARDING
-    defaultDatabaseStrategy:
-      standard:
-        shardingAlgorithmName: database_inline
-        shardingColumn: user_id
-    defaultTableStrategy:
-      none: ''
-    keyGenerators:
-      snowflake:
-        type: SNOWFLAKE
-      t_order_snowflake:
-        type: snowflake
-    scaling:
-      default_scaling:
-        completionDetector:
-          props:
-            incremental-task-idle-minute-threshold: 30
-          type: IDLE
-        dataConsistencyChecker:
-          props:
-            chunk-size: 1000
-          type: DATA_MATCH
-        input:
-          batchSize: 1000
-          workerThread: 40
-        output:
-          batchSize: 1000
-          workerThread: 40
-        streamChannel:
-          props:
-            block-queue-size: 10000
-          type: MEMORY
-    scalingName: default_scaling
-    shardingAlgorithms:
-      database_inline:
-        props:
-          algorithm-expression: ds_${user_id % 2}
-        type: INLINE
-      t_order_inline:
-        props:
-          algorithm-expression: t_order_${order_id % 2}
-        type: INLINE
-    tables:
-      t_order:
-        actualDataNodes: ds_${2..4}.t_order_${0..1}
-        databaseStrategy:
-          standard:
-            shardingAlgorithmName: database_inline
-            shardingColumn: user_id
-        keyGenerateStrategy:
-          column: order_id
-          keyGeneratorName: t_order_snowflake
-        logicTable: t_order
-        tableStrategy:
-          standard:
-            shardingAlgorithmName: t_order_inline
-            shardingColumn: order_id
-  schemaName: sharding_db
-type: ShardingSphereJDBC
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 56163342135..3f6774a65d6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -47,7 +47,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
      * @param jobConfig job config
      * @return job id
      */
-    Optional<String> start(JobConfiguration jobConfig);
+    Optional<String> start(RuleAlteredJobConfiguration jobConfig);
     
     /**
      * Get job progress.
@@ -63,7 +63,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
      * @param jobConfig job configuration
      * @return each sharding item progress
      */
-    Map<Integer, JobProgress> getProgress(JobConfiguration jobConfig);
+    Map<Integer, JobProgress> getProgress(RuleAlteredJobConfiguration jobConfig);
     
     /**
      * Stop cluster writing.
@@ -116,7 +116,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
      * @param jobConfig job configuration
      * @return data consistency check needed or not
      */
-    boolean isDataConsistencyCheckNeeded(JobConfiguration jobConfig);
+    boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration jobConfig);
     
     /**
      * Do data consistency check.
@@ -132,7 +132,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
      * @param jobConfig job configuration
      * @return each logic table check result
      */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(JobConfiguration jobConfig);
+    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig);
     
     /**
      * Do data consistency check.
@@ -164,7 +164,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
      *
      * @param jobConfig job configuration
      */
-    void switchClusterConfiguration(JobConfiguration jobConfig);
+    void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
     
     /**
      * Reset scaling job.
@@ -179,5 +179,5 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
      * @param jobId job id
      * @return job configuration
      */
-    JobConfiguration getJobConfig(String jobId);
+    RuleAlteredJobConfiguration getJobConfig(String jobId);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
similarity index 61%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
index ad8687c4818..932fcf17537 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
@@ -15,24 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+package org.apache.shardingsphere.data.pipeline.api.config.job;
 
 /**
- * Task configuration.
+ * Pipeline job configuration.
  */
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
+public interface PipelineJobConfiguration {
     
-    private final HandleConfiguration handleConfig;
+    /**
+     * Get job id.
+     *
+     * @return job id
+     */
+    String getJobId();
     
-    private final DumperConfiguration dumperConfig;
+    /**
+     * Get database name.
+     *
+     * @return database name
+     */
+    String getDatabaseName();
     
-    private final ImporterConfiguration importerConfig;
+    /**
+     * Get job sharding item.
+     *
+     * @return job sharding item
+     */
+    Integer getJobShardingItem();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
deleted file mode 100644
index 6762e690b32..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import com.google.common.base.Splitter;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-import java.util.List;
-
-/**
- * Handle configuration.
- */
-@NoArgsConstructor
-@Getter
-@Setter
-@ToString
-// TODO rename
-public final class HandleConfiguration {
-    
-    private String jobId;
-    
-    private int concurrency = 3;
-    
-    private int retryTimes = 3;
-    
-    /**
-     * Collection of each logic table's first data node.
-     * <p>
-     * If <pre>actualDataNodes: ds_${0..1}.t_order_${0..1}</pre> and <pre>actualDataNodes: ds_${0..1}.t_order_item_${0..1}</pre>,
-     * then value may be: {@code t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}.
-     * </p>
-     */
-    private String tablesFirstDataNodes;
-    
-    private List<String> jobShardingDataNodes;
-    
-    private String logicTables;
-    
-    /**
-     * Job sharding item.
-     */
-    private Integer jobShardingItem;
-    
-    private int shardingSize = 1000 * 10000;
-    
-    private String sourceDatabaseType;
-    
-    private String targetDatabaseType;
-    
-    /**
-     * Get job sharding count.
-     *
-     * @return job sharding count
-     */
-    public int getJobShardingCount() {
-        return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
-    }
-    
-    /**
-     * Split {@linkplain #logicTables} to logic table names.
-     *
-     * @return logic table names
-     */
-    public List<String> splitLogicTableNames() {
-        return Splitter.on(',').splitToList(logicTables);
-    }
-    
-    /**
-     * Get job ID digest.
-     * 
-     * @return job ID digest
-     */
-    public String getJobIdDigest() {
-        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
deleted file mode 100644
index 310f35471fa..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
-
-import java.util.Collections;
-
-/**
- * Scaling job configuration.
- */
-@NoArgsConstructor
-@AllArgsConstructor
-@Getter
-@Setter
-@Slf4j
-// TODO share for totally new scenario
-public final class JobConfiguration {
-    
-    private WorkflowConfiguration workflowConfig;
-    
-    private PipelineConfiguration pipelineConfig;
-    
-    private HandleConfiguration handleConfig;
-    
-    public JobConfiguration(final WorkflowConfiguration workflowConfig, final PipelineConfiguration pipelineConfig) {
-        this.workflowConfig = workflowConfig;
-        this.pipelineConfig = pipelineConfig;
-    }
-    
-    /**
-     * Build handle configuration.
-     */
-    public void buildHandleConfig() {
-        PipelineConfiguration pipelineConfig = getPipelineConfig();
-        HandleConfiguration handleConfig = getHandleConfig();
-        if (null == handleConfig || null == handleConfig.getJobShardingDataNodes()) {
-            handleConfig = RuleAlteredJobConfigurationPreparerFactory.newInstance().createHandleConfiguration(pipelineConfig, getWorkflowConfig());
-            this.handleConfig = handleConfig;
-        }
-        if (null == handleConfig.getJobId()) {
-            handleConfig.setJobId(generateJobId());
-        }
-        if (Strings.isNullOrEmpty(handleConfig.getSourceDatabaseType())) {
-            PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                    pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
-            handleConfig.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getName());
-        }
-        if (Strings.isNullOrEmpty(handleConfig.getTargetDatabaseType())) {
-            PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                    pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
-            handleConfig.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getName());
-        }
-        if (null == handleConfig.getJobShardingItem()) {
-            handleConfig.setJobShardingItem(0);
-        }
-    }
-    
-    private String generateJobId() {
-        RuleAlteredJobId jobId = new RuleAlteredJobId();
-        // TODO type, subTypes
-        jobId.setType(JobType.RULE_ALTERED.getValue());
-        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
-        jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
-        WorkflowConfiguration workflowConfig = getWorkflowConfig();
-        jobId.setCurrentMetadataVersion(workflowConfig.getActiveVersion());
-        jobId.setNewMetadataVersion(workflowConfig.getNewVersion());
-        jobId.setDatabaseName(workflowConfig.getDatabaseName());
-        return jobId.marshal();
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java
deleted file mode 100644
index 0ecb924a6f0..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import com.google.common.base.Preconditions;
-import lombok.Getter;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-
-/**
- * Pipeline configuration.
- */
-@Getter
-public final class PipelineConfiguration {
-    
-    private YamlPipelineDataSourceConfiguration source;
-    
-    private YamlPipelineDataSourceConfiguration target;
-    
-    /**
-     * Set source.
-     *
-     * @param source source configuration
-     */
-    public void setSource(final YamlPipelineDataSourceConfiguration source) {
-        checkParameters(source);
-        this.source = source;
-    }
-    
-    /**
-     * Set target.
-     *
-     * @param target target configuration
-     */
-    public void setTarget(final YamlPipelineDataSourceConfiguration target) {
-        checkParameters(target);
-        this.target = target;
-    }
-    
-    private void checkParameters(final YamlPipelineDataSourceConfiguration yamlConfig) {
-        Preconditions.checkNotNull(yamlConfig);
-        Preconditions.checkNotNull(yamlConfig.getType());
-        Preconditions.checkNotNull(yamlConfig.getParameter());
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
new file mode 100644
index 00000000000..ac0bc618ca7
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
+import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Scaling job configuration.
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+@Slf4j
+// TODO share for totally new scenario
+// TODO rename to Yaml, add config class
+public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration {
+    
+    private String jobId;
+    
+    private String databaseName;
+    
+    // TODO it should not put in jobConfig since it's mutable
+    private Integer jobShardingItem;
+    
+    /**
+     * Map{altered rule yaml class name, re-shard needed table names}.
+     */
+    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
+    
+    private Integer activeVersion;
+    
+    private Integer newVersion;
+    
+    private YamlPipelineDataSourceConfiguration source;
+    
+    private YamlPipelineDataSourceConfiguration target;
+    
+    private int concurrency = 3;
+    
+    private int retryTimes = 3;
+    
+    /**
+     * Collection of each logic table's first data node.
+     * <p>
+     * If <pre>actualDataNodes: ds_${0..1}.t_order_${0..1}</pre> and <pre>actualDataNodes: ds_${0..1}.t_order_item_${0..1}</pre>,
+     * then value may be: {@code t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}.
+     * </p>
+     */
+    private String tablesFirstDataNodes;
+    
+    private List<String> jobShardingDataNodes;
+    
+    private String logicTables;
+    
+    // TODO shardingSize should be configurable
+    private int shardingSize = 1000 * 10000;
+    
+    private String sourceDatabaseType;
+    
+    private String targetDatabaseType;
+    
+    /**
+     * Set source.
+     *
+     * @param source source configuration
+     */
+    public void setSource(final YamlPipelineDataSourceConfiguration source) {
+        checkParameters(source);
+        this.source = source;
+    }
+    
+    /**
+     * Set target.
+     *
+     * @param target target configuration
+     */
+    public void setTarget(final YamlPipelineDataSourceConfiguration target) {
+        checkParameters(target);
+        this.target = target;
+    }
+    
+    private void checkParameters(final YamlPipelineDataSourceConfiguration yamlConfig) {
+        Preconditions.checkNotNull(yamlConfig);
+        Preconditions.checkNotNull(yamlConfig.getType());
+        Preconditions.checkNotNull(yamlConfig.getParameter());
+    }
+    
+    /**
+     * Get job sharding count.
+     *
+     * @return job sharding count
+     */
+    public int getJobShardingCount() {
+        return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
+    }
+    
+    /**
+     * Split {@linkplain #logicTables} to logic table names.
+     *
+     * @return logic table names
+     */
+    public List<String> splitLogicTableNames() {
+        return Splitter.on(',').splitToList(logicTables);
+    }
+    
+    /**
+     * Build handle configuration.
+     */
+    public void buildHandleConfig() {
+        if (null == getJobShardingDataNodes()) {
+            RuleAlteredJobConfigurationPreparerFactory.newInstance().extendJobConfiguration(this);
+        }
+        if (null == jobId) {
+            jobId = generateJobId();
+        }
+        if (Strings.isNullOrEmpty(getSourceDatabaseType())) {
+            PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter());
+            setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getName());
+        }
+        if (Strings.isNullOrEmpty(getTargetDatabaseType())) {
+            PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter());
+            setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getName());
+        }
+        if (null == jobShardingItem) {
+            jobShardingItem = 0;
+        }
+    }
+    
+    private String generateJobId() {
+        RuleAlteredJobId jobId = new RuleAlteredJobId();
+        // TODO type, subTypes
+        jobId.setType(JobType.RULE_ALTERED.getValue());
+        jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+        jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
+        jobId.setCurrentMetadataVersion(activeVersion);
+        jobId.setNewMetadataVersion(newVersion);
+        jobId.setDatabaseName(databaseName);
+        return jobId.marshal();
+    }
+    
+    @Override
+    public String toString() {
+        return "RuleAlteredJobConfiguration{"
+                + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
+                + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion + ", shardingSize=" + shardingSize
+                + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
+                + '}';
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
index ad8687c4818..e2f6e10b356 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 @ToString
 public final class TaskConfiguration {
     
-    private final HandleConfiguration handleConfig;
+    private final RuleAlteredJobConfiguration jobConfig;
     
     private final DumperConfiguration dumperConfig;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/WorkflowConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/WorkflowConfiguration.java
deleted file mode 100644
index 11dc65ad923..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/WorkflowConfiguration.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-import java.util.List;
-import java.util.Map;
-
-@NoArgsConstructor
-@Getter
-@Setter
-@ToString
-public final class WorkflowConfiguration {
-    
-    private long allowDelayMilliseconds = 60 * 1000L;
-    
-    private String databaseName;
-    
-    /**
-     * Map{altered rule yaml class name, re-shard needed table names}.
-     */
-    private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-    
-    private Integer activeVersion;
-    
-    private Integer newVersion;
-    
-    public WorkflowConfiguration(final String databaseName, final Map<String, List<String>> alteredRuleYamlClassNameTablesMap, final int activeVersion, final int newVersion) {
-        this.databaseName = databaseName;
-        this.alteredRuleYamlClassNameTablesMap = alteredRuleYamlClassNameTablesMap;
-        this.activeVersion = activeVersion;
-        this.newVersion = newVersion;
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index 557c86fac46..6d47590087d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -17,10 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
 
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.spi.type.required.RequiredSPI;
 
@@ -30,21 +28,18 @@ import org.apache.shardingsphere.spi.type.required.RequiredSPI;
 public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI {
     
     /**
-     * Create handle configuration, used to build job configuration.
+     * Extend job configuration.
      *
-     * @param pipelineConfig pipeline configuration
-     * @param workflowConfig workflow configuration
-     * @return handle configuration
+     * @param jobConfig job configuration
      */
-    HandleConfiguration createHandleConfiguration(PipelineConfiguration pipelineConfig, WorkflowConfiguration workflowConfig);
+    void extendJobConfiguration(RuleAlteredJobConfiguration jobConfig);
     
     /**
      * Create task configuration, used by underlying scheduler.
      *
-     * @param pipelineConfig pipeline configuration
-     * @param handleConfig handle configuration
+     * @param jobConfig job configuration
      * @param onRuleAlteredActionConfig action configuration
      * @return task configuration
      */
-    TaskConfiguration createTaskConfiguration(PipelineConfiguration pipelineConfig, HandleConfiguration handleConfig, OnRuleAlteredActionConfiguration onRuleAlteredActionConfig);
+    TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration jobConfig, OnRuleAlteredActionConfiguration onRuleAlteredActionConfig);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
similarity index 53%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfigurationTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
index 8b51502ad7a..e35d3436abd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
@@ -25,38 +25,24 @@ import java.util.Arrays;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-public final class HandleConfigurationTest {
+public final class RuleAlteredJobConfigurationTest {
     
     @Test
     public void assertGetJobShardingCountByNull() {
-        assertThat(new HandleConfiguration().getJobShardingCount(), is(0));
+        assertThat(new RuleAlteredJobConfiguration().getJobShardingCount(), is(0));
     }
     
     @Test
     public void assertGetJobShardingCount() {
-        HandleConfiguration handleConfig = new HandleConfiguration();
-        handleConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
-        assertThat(handleConfig.getJobShardingCount(), is(2));
+        RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+        jobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+        assertThat(jobConfig.getJobShardingCount(), is(2));
     }
     
     @Test
     public void assertSplitLogicTableNames() {
-        HandleConfiguration handleConfig = new HandleConfiguration();
-        handleConfig.setLogicTables("foo_tbl,bar_tbl");
-        assertThat(handleConfig.splitLogicTableNames(), is(Lists.newArrayList("foo_tbl", "bar_tbl")));
-    }
-    
-    @Test
-    public void assertGetJobIdDigestByLongName() {
-        HandleConfiguration handleConfig = new HandleConfiguration();
-        handleConfig.setJobId("abcdefg");
-        assertThat(handleConfig.getJobIdDigest(), is("abcdef"));
-    }
-    
-    @Test
-    public void assertGetJobIdDigestByShortName() {
-        HandleConfiguration handleConfiguration = new HandleConfiguration();
-        handleConfiguration.setJobId("abcdef");
-        assertThat(handleConfiguration.getJobIdDigest(), is("abcdef"));
+        RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+        jobConfig.setLogicTables("foo_tbl,bar_tbl");
+        assertThat(jobConfig.splitLogicTableNames(), is(Lists.newArrayList("foo_tbl", "bar_tbl")));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 384ea534d12..61ae24f551f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -61,7 +61,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     public void persistJobProgress(final RuleAlteredJobContext jobContext) {
         JobProgress jobProgress = new JobProgress();
         jobProgress.setStatus(jobContext.getStatus());
-        jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getHandleConfig().getSourceDatabaseType());
+        jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
         jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
         jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
         String value = YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 5edfcd16711..aca7e399b82 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,8 +21,7 @@ import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -91,10 +90,10 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     private JobInfo getJobInfo(final String jobName) {
         JobInfo result = new JobInfo(jobName);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(result.getJobId());
-        JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
         result.setActive(!jobConfigPOJO.isDisabled());
-        result.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
-        result.setTables(jobConfig.getHandleConfig().getLogicTables());
+        result.setShardingTotalCount(jobConfig.getJobShardingCount());
+        result.setTables(jobConfig.getLogicTables());
         result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
         result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
         result.setJobParameter(jobConfigPOJO.getJobParameter());
@@ -102,15 +101,15 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     }
     
     @Override
-    public Optional<String> start(final JobConfiguration jobConfig) {
+    public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) {
         jobConfig.buildHandleConfig();
-        if (jobConfig.getHandleConfig().getJobShardingCount() == 0) {
+        if (jobConfig.getJobShardingCount() == 0) {
             log.warn("Invalid scaling job config!");
             throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
         }
-        log.info("Start scaling job by {}", jobConfig.getHandleConfig());
+        log.info("Start scaling job by {}", jobConfig);
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        String jobId = jobConfig.getHandleConfig().getJobId();
+        String jobId = jobConfig.getJobId();
         String jobConfigKey = String.format("%s/%s/config", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId);
         if (repositoryAPI.isExisted(jobConfigKey)) {
             log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
@@ -121,10 +120,10 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         return Optional.of(jobId);
     }
     
-    private String createJobConfig(final JobConfiguration jobConfig) {
+    private String createJobConfig(final RuleAlteredJobConfiguration jobConfig) {
         JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
-        jobConfigPOJO.setJobName(jobConfig.getHandleConfig().getJobId());
-        jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
+        jobConfigPOJO.setJobName(jobConfig.getJobId());
+        jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
         jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig));
         jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
         return YamlEngine.marshal(jobConfigPOJO);
@@ -137,10 +136,10 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     }
     
     @Override
-    public Map<Integer, JobProgress> getProgress(final JobConfiguration jobConfig) {
-        String jobId = jobConfig.getHandleConfig().getJobId();
+    public Map<Integer, JobProgress> getProgress(final RuleAlteredJobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        return IntStream.range(0, jobConfig.getHandleConfig().getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
+        return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
             JobProgress jobProgress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, each);
             if (null != jobProgress) {
                 jobProgress.setActive(!jobConfigPOJO.isDisabled());
@@ -149,22 +148,22 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         }, LinkedHashMap::putAll);
     }
     
-    private void verifyManualMode(final JobConfiguration jobConfig) {
+    private void verifyManualMode(final RuleAlteredJobConfiguration jobConfig) {
         RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         if (null != ruleAlteredContext.getCompletionDetectAlgorithm()) {
             throw new PipelineVerifyFailedException("It's not necessary to do it in auto mode.");
         }
     }
     
-    private void verifyJobNotCompleted(final JobConfiguration jobConfig) {
-        if (RuleAlteredJobProgressDetector.isJobCompleted(jobConfig.getHandleConfig().getJobShardingCount(), getProgress(jobConfig).values())) {
+    private void verifyJobNotCompleted(final RuleAlteredJobConfiguration jobConfig) {
+        if (RuleAlteredJobProgressDetector.isJobCompleted(jobConfig.getJobShardingCount(), getProgress(jobConfig).values())) {
             throw new PipelineVerifyFailedException("Job is completed, it's not necessary to do it.");
         }
     }
     
-    private void verifySourceWritingStopped(final JobConfiguration jobConfig) {
+    private void verifySourceWritingStopped(final RuleAlteredJobConfiguration jobConfig) {
         LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
-        String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+        String databaseName = jobConfig.getDatabaseName();
         ShardingSphereLock lock = lockContext.getGlobalLock(databaseName);
         if (null == lock || !lock.isLocked()) {
             throw new PipelineVerifyFailedException("Source writing is not stopped. You could run `STOP SCALING SOURCE WRITING {jobId}` to stop it.");
@@ -176,11 +175,11 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         checkModeConfig();
         log.info("stopClusterWriteDB for job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
         verifyManualMode(jobConfig);
         verifyJobNotStopped(jobConfigPOJO);
         verifyJobNotCompleted(jobConfig);
-        String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+        String databaseName = jobConfig.getDatabaseName();
         stopClusterWriteDB(databaseName, jobId);
     }
     
@@ -204,9 +203,9 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         checkModeConfig();
         log.info("restoreClusterWriteDB for job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
         verifyManualMode(jobConfig);
-        String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+        String databaseName = jobConfig.getDatabaseName();
         restoreClusterWriteDB(databaseName, jobId);
     }
     
@@ -242,12 +241,12 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     @Override
     public boolean isDataConsistencyCheckNeeded(final String jobId) {
         log.info("isDataConsistencyCheckNeeded for job {}", jobId);
-        JobConfiguration jobConfig = getJobConfig(jobId);
+        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobId);
         return isDataConsistencyCheckNeeded(jobConfig);
     }
     
     @Override
-    public boolean isDataConsistencyCheckNeeded(final JobConfiguration jobConfig) {
+    public boolean isDataConsistencyCheckNeeded(final RuleAlteredJobConfiguration jobConfig) {
         RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         return isDataConsistencyCheckNeeded(ruleAlteredContext);
     }
@@ -260,13 +259,13 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
         checkModeConfig();
         log.info("Data consistency check for job {}", jobId);
-        JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+        RuleAlteredJobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
         verifyDataConsistencyCheck(jobConfig);
         return dataConsistencyCheck(jobConfig);
     }
     
     @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final JobConfiguration jobConfig) {
+    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
         RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         if (!isDataConsistencyCheckNeeded(ruleAlteredContext)) {
             log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
@@ -279,20 +278,20 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType) {
         checkModeConfig();
         log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
-        JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+        RuleAlteredJobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
         verifyDataConsistencyCheck(jobConfig);
         return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, new Properties()));
     }
     
-    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
-        String jobId = jobConfig.getHandleConfig().getJobId();
+    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
+        String jobId = jobConfig.getJobId();
         Map<String, DataConsistencyCheckResult> result = new DataConsistencyChecker(jobConfig).check(calculator);
         log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getType(), result);
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
         return result;
     }
     
-    private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
+    private void verifyDataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
         verifyManualMode(jobConfig);
         verifySourceWritingStopped(jobConfig);
     }
@@ -319,7 +318,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         checkModeConfig();
         log.info("Switch cluster configuration for job {}", jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+        RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
         verifyManualMode(jobConfig);
         verifyJobNotStopped(jobConfigPOJO);
         verifyJobNotCompleted(jobConfig);
@@ -327,8 +326,8 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     }
     
     @Override
-    public void switchClusterConfiguration(final JobConfiguration jobConfig) {
-        String jobId = jobConfig.getHandleConfig().getJobId();
+    public void switchClusterConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobId();
         RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
         if (isDataConsistencyCheckNeeded(ruleAlteredContext)) {
@@ -337,8 +336,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
                 throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
             }
         }
-        WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
-        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(workflowConfig.getDatabaseName(), workflowConfig.getActiveVersion(), workflowConfig.getNewVersion());
+        ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(jobConfig.getDatabaseName(), jobConfig.getActiveVersion(), jobConfig.getNewVersion());
         ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
         // TODO rewrite job status update after job progress structure refactor
         RuleAlteredJobSchedulerCenter.updateJobStatus(jobId, JobStatus.FINISHED);
@@ -363,11 +361,11 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
     }
     
     @Override
-    public JobConfiguration getJobConfig(final String jobId) {
+    public RuleAlteredJobConfiguration getJobConfig(final String jobId) {
         return getJobConfig(getElasticJobConfigPOJO(jobId));
     }
     
-    private JobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
-        return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
+    private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
+        return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index d477a5e5133..38370424eda 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -68,13 +68,14 @@ import java.util.concurrent.TimeUnit;
  */
 public final class DataConsistencyChecker {
     
-    private final JobConfiguration jobConfig;
+    // TODO remove jobConfig for common usage
+    private final RuleAlteredJobConfiguration jobConfig;
     
     private final Collection<String> logicTableNames;
     
-    public DataConsistencyChecker(final JobConfiguration jobConfig) {
+    public DataConsistencyChecker(final RuleAlteredJobConfiguration jobConfig) {
         this.jobConfig = jobConfig;
-        logicTableNames = jobConfig.getHandleConfig().splitLogicTableNames();
+        logicTableNames = jobConfig.splitLogicTableNames();
     }
     
     /**
@@ -96,12 +97,10 @@ public final class DataConsistencyChecker {
     }
     
     private Map<String, DataConsistencyCountCheckResult> checkCount() {
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobIdDigest() + "-count-check-%d");
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-count-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
+        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
+        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
         Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(logicTableNames.size(), 1);
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
@@ -131,6 +130,11 @@ public final class DataConsistencyChecker {
         }
     }
     
+    // TODO use digest (crc32, murmurhash)
+    private String getJobIdDigest(final String jobId) {
+        return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
+    }
+    
     private long count(final DataSource dataSource, final String table, final DatabaseType databaseType) {
         try (
                 Connection connection = dataSource.getConnection();
@@ -144,16 +148,16 @@ public final class DataConsistencyChecker {
     }
     
     private Map<String, DataConsistencyContentCheckResult> checkData(final DataConsistencyCalculateAlgorithm calculator) {
-        PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getSource());
-        PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getTarget());
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobIdDigest() + "-data-check-%d");
+        PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getSource());
+        PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getTarget());
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
         Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(logicTableNames.size(), 1);
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
                 PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getWorkflowConfig().getDatabaseName());
+            Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getDatabaseName());
             logicTableNames.forEach(each -> {
                 // TODO put to preparer
                 if (!tableMetaDataMap.containsKey(each)) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 49b0f1de24b..773f6116fbd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.execute;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
@@ -65,22 +65,23 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
             if (deleted || disabled) {
                 log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), deleted, disabled);
                 RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
-                JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
+                // TODO refactor: dispatch to different job types
+                RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
                 if (deleted) {
                     new RuleAlteredJobPreparer().cleanup(jobConfig);
-                } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getHandleConfig().getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
+                } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
                     log.info("isJobSuccessful=true");
                     new RuleAlteredJobPreparer().cleanup(jobConfig);
                 }
-                ScalingReleaseDatabaseLevelLockEvent releaseLockEvent = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getWorkflowConfig().getDatabaseName());
+                ScalingReleaseDatabaseLevelLockEvent releaseLockEvent = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName());
                 ShardingSphereEventBus.getInstance().post(releaseLockEvent);
                 return;
             }
             switch (event.getType()) {
                 case ADDED:
                 case UPDATED:
-                    JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
-                    String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+                    RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+                    String databaseName = jobConfig.getDatabaseName();
                     if (PipelineSimpleLock.getInstance().tryLock(databaseName, 1000)) {
                         execute(jobConfigPOJO);
                     } else {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 8a6006bde9d..b755976eec2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -55,7 +55,7 @@ public final class FinishedCheckJob implements SimpleJob {
             }
             try {
                 // TODO refactor: dispatch to different job types
-                JobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), JobConfiguration.class, true);
+                RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), RuleAlteredJobConfiguration.class, true);
                 RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
                 if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) {
                     log.info("completionDetector not configured, auto switch will not be enabled. You could query job progress and switch config manually with DistSQL.");
@@ -67,7 +67,7 @@ public final class FinishedCheckJob implements SimpleJob {
                 }
                 log.info("scaling job {} almost finished.", jobId);
                 RowBasedJobLockAlgorithm sourceWritingStopAlgorithm = ruleAlteredContext.getSourceWritingStopAlgorithm();
-                String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+                String databaseName = jobConfig.getDatabaseName();
                 try {
                     if (null != sourceWritingStopAlgorithm) {
                         sourceWritingStopAlgorithm.lock(databaseName, jobId + "");
@@ -109,14 +109,14 @@ public final class FinishedCheckJob implements SimpleJob {
         return flag;
     }
     
-    private boolean dataConsistencyCheck(final JobConfiguration jobConfig) {
-        String jobId = jobConfig.getHandleConfig().getJobId();
+    private boolean dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobId();
         log.info("dataConsistencyCheck for job {}", jobId);
         return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, ruleAlteredJobAPI.dataConsistencyCheck(jobConfig));
     }
     
-    private void switchClusterConfiguration(final String databaseName, final JobConfiguration jobConfig, final RuleBasedJobLockAlgorithm checkoutLockAlgorithm) {
-        String jobId = jobConfig.getHandleConfig().getJobId();
+    private void switchClusterConfiguration(final String databaseName, final RuleAlteredJobConfiguration jobConfig, final RuleBasedJobLockAlgorithm checkoutLockAlgorithm) {
+        String jobId = jobConfig.getJobId();
         try {
             if (null != checkoutLockAlgorithm) {
                 checkoutLockAlgorithm.lock(databaseName, jobId + "");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 1f423ff128a..5570b5d6fa3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
@@ -53,12 +53,12 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     
     private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
     
-    protected final PipelineDataSourceWrapper getSourceCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
-        return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter()));
+    protected final PipelineDataSourceWrapper getSourceCachedDataSource(final RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
+        return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
     }
     
-    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
-        return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter()));
+    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
+        return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter()));
     }
     
     protected final void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index c24bce303d0..41233157c1a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 
@@ -35,7 +35,7 @@ public final class PrepareTargetTablesParameter {
     private final JobDataNodeLine tablesFirstDataNodes;
     
     @NonNull
-    private final PipelineConfiguration pipelineConfiguration;
+    private final RuleAlteredJobConfiguration jobConfig;
     
     @NonNull
     private final PipelineDataSourceManager dataSourceManager;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 08c25ce57aa..b437dbfe324 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -42,8 +42,8 @@ public final class RuleAlteredJob implements SimpleJob {
     @Override
     public void execute(final ShardingContext shardingContext) {
         log.info("Execute job {}-{}", shardingContext.getJobName(), shardingContext.getShardingItem());
-        JobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), JobConfiguration.class, true);
-        jobConfig.getHandleConfig().setJobShardingItem(shardingContext.getShardingItem());
+        RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+        jobConfig.setJobShardingItem(shardingContext.getShardingItem());
         RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
         jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()));
         jobContext.setJobPreparer(jobPreparer);
@@ -56,7 +56,7 @@ public final class RuleAlteredJob implements SimpleJob {
             RuleAlteredJobSchedulerCenter.stop(shardingContext.getJobName());
             jobContext.setStatus(JobStatus.PREPARING_FAILURE);
             governanceRepositoryAPI.persistJobProgress(jobContext);
-            ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getWorkflowConfig().getDatabaseName());
+            ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName());
             ShardingSphereEventBus.getInstance().post(event);
             throw ex;
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 32562af86ab..aa916d6a04a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -23,7 +23,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -59,7 +59,7 @@ public final class RuleAlteredJobContext {
     
     private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
     
-    private final JobConfiguration jobConfig;
+    private final RuleAlteredJobConfiguration jobConfig;
     
     private final RuleAlteredContext ruleAlteredContext;
     
@@ -81,13 +81,13 @@ public final class RuleAlteredJobContext {
     
     private RuleAlteredJobPreparer jobPreparer;
     
-    public RuleAlteredJobContext(final JobConfiguration jobConfig) {
+    public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig) {
         ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         this.jobConfig = jobConfig;
         jobConfig.buildHandleConfig();
-        jobId = jobConfig.getHandleConfig().getJobId();
-        shardingItem = jobConfig.getHandleConfig().getJobShardingItem();
-        taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig.getPipelineConfig(), jobConfig.getHandleConfig(), ruleAlteredContext.getOnRuleAlteredActionConfig());
+        jobId = jobConfig.getJobId();
+        shardingItem = jobConfig.getJobShardingItem();
+        taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, ruleAlteredContext.getOnRuleAlteredActionConfig());
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 0138c104663..5f97ac40df7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -94,9 +94,9 @@ public final class RuleAlteredJobPreparer {
     }
     
     private void prepareAndCheckTargetWithLock(final RuleAlteredJobContext jobContext) {
-        JobConfiguration jobConfig = jobContext.getJobConfig();
+        RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
         // TODO the lock will be replaced
-        String lockName = "prepare-" + jobConfig.getHandleConfig().getJobId();
+        String lockName = "prepare-" + jobConfig.getJobId();
         ShardingSphereLock lock = PipelineContext.getContextManager().getInstanceContext().getLockContext().getOrCreateGlobalLock(lockName);
         if (lock.tryLock(lockName, 1)) {
             try {
@@ -128,19 +128,19 @@ public final class RuleAlteredJobPreparer {
         }
     }
     
-    private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
-        Optional<DataSourcePreparer> dataSourcePreparer = EnvironmentCheckerFactory.getDataSourcePreparer(jobConfig.getHandleConfig().getTargetDatabaseType());
+    private void prepareTarget(final RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
+        Optional<DataSourcePreparer> dataSourcePreparer = EnvironmentCheckerFactory.getDataSourcePreparer(jobConfig.getTargetDatabaseType());
         if (!dataSourcePreparer.isPresent()) {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
-        JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig(), dataSourceManager);
+        JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
+        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig, dataSourceManager);
         dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
     }
     
     private void checkSourceDataSource(final RuleAlteredJobContext jobContext) {
-        DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getHandleConfig().getSourceDatabaseType());
+        DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getSourceDatabaseType());
         Collection<PipelineDataSourceWrapper> sourceDataSources = Collections.singleton(jobContext.getSourceDataSource());
         dataSourceChecker.checkConnection(sourceDataSources);
         dataSourceChecker.checkPrivilege(sourceDataSources);
@@ -148,7 +148,7 @@ public final class RuleAlteredJobPreparer {
     }
     
     private void checkTargetDataSource(final RuleAlteredJobContext jobContext, final PipelineDataSourceWrapper targetDataSource) {
-        DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType());
+        DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getTargetDatabaseType());
         Collection<PipelineDataSourceWrapper> targetDataSources = Collections.singletonList(targetDataSource);
         dataSourceChecker.checkConnection(targetDataSources);
         dataSourceChecker.checkTargetTable(targetDataSources, jobContext.getTaskConfig().getImporterConfig().getShardingColumnsMap().keySet());
@@ -166,7 +166,7 @@ public final class RuleAlteredJobPreparer {
         PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
         taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
-        IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getHandleConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
+        IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getJobConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
                 pipelineChannelFactory, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
         jobContext.getIncrementalTasks().add(incrementalTask);
     }
@@ -179,7 +179,7 @@ public final class RuleAlteredJobPreparer {
                 return positionOptional.get();
             }
         }
-        String databaseType = taskConfig.getHandleConfig().getSourceDatabaseType();
+        String databaseType = taskConfig.getJobConfig().getSourceDatabaseType();
         DataSource dataSource = dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
         return PositionInitializerFactory.getPositionInitializer(databaseType).init(dataSource);
     }
@@ -189,7 +189,7 @@ public final class RuleAlteredJobPreparer {
      *
      * @param jobConfig job configuration
      */
-    public void cleanup(final JobConfiguration jobConfig) {
+    public void cleanup(final RuleAlteredJobConfiguration jobConfig) {
         try {
             cleanup0(jobConfig);
         } catch (final SQLException ex) {
@@ -197,11 +197,11 @@ public final class RuleAlteredJobPreparer {
         }
     }
     
-    private void cleanup0(final JobConfiguration jobConfig) throws SQLException {
-        DatabaseType databaseType = DatabaseTypeRegistry.getActualDatabaseType(jobConfig.getHandleConfig().getSourceDatabaseType());
+    private void cleanup0(final RuleAlteredJobConfiguration jobConfig) throws SQLException {
+        DatabaseType databaseType = DatabaseTypeRegistry.getActualDatabaseType(jobConfig.getSourceDatabaseType());
         PositionInitializer positionInitializer = PositionInitializerFactory.getPositionInitializer(databaseType.getName());
         ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory
-                .newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
+                .newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         for (DataSourceProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceDataSourceConfig.getRootConfig()).values()) {
             try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
                 positionInitializer.destroy(dataSource);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index b6718c37502..4043474cafc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -104,7 +104,7 @@ public final class RuleAlteredJobScheduler implements Runnable {
                 log.error("Inventory task execute failed.", throwable);
                 stop();
                 jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
-                ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getWorkflowConfig().getDatabaseName());
+                ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
                 ShardingSphereEventBus.getInstance().post(event);
             }
         };
@@ -138,7 +138,7 @@ public final class RuleAlteredJobScheduler implements Runnable {
                 log.error("Incremental task execute failed.", throwable);
                 stop();
                 jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
-                ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getWorkflowConfig().getDatabaseName());
+                ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
                 ShardingSphereEventBus.getInstance().post(event);
             }
         };
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index c2cf39f75e2..89ea2b67447 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -22,11 +22,8 @@ import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -122,11 +119,11 @@ public final class RuleAlteredJobWorker {
      * @param jobConfig job configuration
      * @return rule altered context
      */
-    public static RuleAlteredContext createRuleAlteredContext(final JobConfiguration jobConfig) {
+    public static RuleAlteredContext createRuleAlteredContext(final RuleAlteredJobConfiguration jobConfig) {
         YamlRootConfiguration targetRootConfig = getYamlRootConfig(jobConfig);
         YamlRuleConfiguration yamlRuleConfig = null;
         for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
-            if (jobConfig.getWorkflowConfig().getAlteredRuleYamlClassNameTablesMap().containsKey(each.getClass().getName())) {
+            if (jobConfig.getAlteredRuleYamlClassNameTablesMap().containsKey(each.getClass().getName())) {
                 yamlRuleConfig = each;
                 break;
             }
@@ -151,14 +148,12 @@ public final class RuleAlteredJobWorker {
      * @param jobConfig job configuration
      * @return YAML root configuration
      */
-    private static YamlRootConfiguration getYamlRootConfig(final JobConfiguration jobConfig) {
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
+    private static YamlRootConfiguration getYamlRootConfig(final RuleAlteredJobConfiguration jobConfig) {
+        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
         if (targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
             return ((ShardingSpherePipelineDataSourceConfiguration) targetDataSourceConfig).getRootConfig();
         }
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
+        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         return ((ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig).getRootConfig();
     }
     
@@ -174,7 +169,7 @@ public final class RuleAlteredJobWorker {
             log.warn("There is uncompleted job with the same database name, please handle it first, current job will be ignored");
             return;
         }
-        Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
+        Optional<RuleAlteredJobConfiguration> jobConfigOptional = createJobConfig(event);
         if (jobConfigOptional.isPresent()) {
             PipelineJobAPIFactory.newInstance().start(jobConfigOptional.get());
         } else {
@@ -185,7 +180,7 @@ public final class RuleAlteredJobWorker {
         }
     }
     
-    private Optional<JobConfiguration> createJobConfig(final StartScalingEvent event) {
+    private Optional<RuleAlteredJobConfiguration> createJobConfig(final StartScalingEvent event) {
         YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getSourceDataSource(), event.getSourceRule());
         YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getTargetDataSource(), event.getTargetRule());
         Map<String, List<String>> alteredRuleYamlClassNameTablesMap = new HashMap<>();
@@ -209,9 +204,14 @@ public final class RuleAlteredJobWorker {
             log.error("more than 1 rule altered");
             throw new PipelineJobCreationException("more than 1 rule altered");
         }
-        WorkflowConfiguration workflowConfig = new WorkflowConfiguration(event.getDatabaseName(), alteredRuleYamlClassNameTablesMap, event.getActiveVersion(), event.getNewVersion());
-        PipelineConfiguration pipelineConfig = getPipelineConfiguration(sourceRootConfig, targetRootConfig);
-        return Optional.of(new JobConfiguration(workflowConfig, pipelineConfig));
+        RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+        result.setDatabaseName(event.getDatabaseName());
+        result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
+        result.setActiveVersion(event.getActiveVersion());
+        result.setNewVersion(event.getNewVersion());
+        result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
+        result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
+        return Optional.of(result);
     }
     
     private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules,
@@ -231,13 +231,6 @@ public final class RuleAlteredJobWorker {
         return result;
     }
     
-    private PipelineConfiguration getPipelineConfiguration(final YamlRootConfiguration sourceRootConfig, final YamlRootConfiguration targetRootConfig) {
-        PipelineConfiguration result = new PipelineConfiguration();
-        result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
-        result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
-        return result;
-    }
-    
     private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final YamlRootConfiguration yamlConfig) {
         PipelineDataSourceConfiguration config = new ShardingSpherePipelineDataSourceConfiguration(yamlConfig);
         YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
@@ -280,14 +273,12 @@ public final class RuleAlteredJobWorker {
     /**
      * Build task configuration.
      *
-     * @param pipelineConfig pipeline configuration
-     * @param handleConfig handle configuration
+     * @param jobConfig job configuration
      * @param onRuleAlteredActionConfig action configuration
      * @return task configuration
      */
-    public static TaskConfiguration buildTaskConfig(final PipelineConfiguration pipelineConfig,
-                                                    final HandleConfiguration handleConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
-        return RuleAlteredJobConfigurationPreparerFactory.newInstance().createTaskConfiguration(pipelineConfig, handleConfig, onRuleAlteredActionConfig);
+    public static TaskConfiguration buildTaskConfig(final RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+        return RuleAlteredJobConfigurationPreparerFactory.newInstance().createTaskConfiguration(jobConfig, onRuleAlteredActionConfig);
     }
     
     private boolean hasUncompletedJobOfSameDatabaseName(final String databaseName) {
@@ -297,8 +288,8 @@ public final class RuleAlteredJobWorker {
                     .allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
                 continue;
             }
-            JobConfiguration jobConfiguration = YamlEngine.unmarshal(each.getJobParameter(), JobConfiguration.class, true);
-            if (hasUncompletedJobOfSameDatabaseName(jobConfiguration, each.getJobId(), databaseName)) {
+            RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(each.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+            if (hasUncompletedJobOfSameDatabaseName(jobConfig, each.getJobId(), databaseName)) {
                 result = true;
                 break;
             }
@@ -306,14 +297,8 @@ public final class RuleAlteredJobWorker {
         return !result;
     }
     
-    private boolean hasUncompletedJobOfSameDatabaseName(final JobConfiguration jobConfig, final String jobId, final String currentDatabaseName) {
-        HandleConfiguration handleConfig = jobConfig.getHandleConfig();
-        WorkflowConfiguration workflowConfig;
-        if (null == handleConfig || null == (workflowConfig = jobConfig.getWorkflowConfig())) {
-            log.warn("handleConfig or workflowConfig null, jobId={}", jobId);
-            return false;
-        }
-        return currentDatabaseName.equals(workflowConfig.getDatabaseName());
+    private boolean hasUncompletedJobOfSameDatabaseName(final RuleAlteredJobConfiguration jobConfig, final String jobId, final String currentDatabaseName) {
+        return currentDatabaseName.equals(jobConfig.getDatabaseName());
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 851749b0189..cc8c76f4707 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -177,8 +177,8 @@ public final class InventoryTaskSplitter {
     
     private Collection<IngestPosition<?>> getPositionByPrimaryKeyRange(final RuleAlteredJobContext jobContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
         Collection<IngestPosition<?>> result = new ArrayList<>();
-        JobConfiguration jobConfig = jobContext.getJobConfig();
-        String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getSourceDatabaseType())
+        RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
+        String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getSourceDatabaseType())
                 .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
         try (
                 Connection connection = dataSource.getConnection();
@@ -186,7 +186,7 @@ public final class InventoryTaskSplitter {
             long beginId = 0;
             for (int i = 0; i < Integer.MAX_VALUE; i++) {
                 ps.setLong(1, beginId);
-                ps.setLong(2, jobConfig.getHandleConfig().getShardingSize());
+                ps.setLong(2, jobConfig.getShardingSize());
                 try (ResultSet rs = ps.executeQuery()) {
                     if (!rs.next()) {
                         log.info("getPositionByPrimaryKeyRange, rs.next false, break");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index f01419f6db0..dccd3b0c979 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.scaling.core.job.environment;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -43,15 +43,15 @@ public final class ScalingEnvironmentManager {
      * @throws SQLException SQL exception
      */
     // TODO seems it should be removed, dangerous to use
-    public void cleanupTargetTables(final JobConfiguration jobConfig) throws SQLException {
-        Collection<String> tables = jobConfig.getHandleConfig().splitLogicTableNames();
+    public void cleanupTargetTables(final RuleAlteredJobConfiguration jobConfig) throws SQLException {
+        Collection<String> tables = jobConfig.splitLogicTableNames();
         log.info("cleanupTargetTables, tables={}", tables);
-        YamlPipelineDataSourceConfiguration target = jobConfig.getPipelineConfig().getTarget();
+        YamlPipelineDataSourceConfiguration target = jobConfig.getTarget();
         try (
                 PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
                 Connection connection = dataSource.getConnection()) {
             for (String each : tables) {
-                String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
+                String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getTargetDatabaseType()).buildTruncateSQL(each);
                 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
                     preparedStatement.execute();
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 0657f5fb720..32dced0adf4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
@@ -44,11 +44,11 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
-        PipelineConfiguration pipelineConfig = parameter.getPipelineConfiguration();
+        RuleAlteredJobConfiguration jobConfig = parameter.getJobConfig();
         PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
         try (
-                Connection sourceConnection = getSourceCachedDataSource(pipelineConfig, dataSourceManager).getConnection();
-                Connection targetConnection = getTargetCachedDataSource(pipelineConfig, dataSourceManager).getConnection()) {
+                Connection sourceConnection = getSourceCachedDataSource(jobConfig, dataSourceManager).getConnection();
+                Connection targetConnection = getTargetCachedDataSource(jobConfig, dataSourceManager).getConnection()) {
             Collection<String> logicTableNames = parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
             for (String each : logicTableNames) {
                 String createTableSQL = getCreateTableSQL(sourceConnection, each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index ccf739ffa5e..9ae4828f30e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -50,7 +50,7 @@ public final class MySQLDataSourcePreparerTest {
     private PrepareTargetTablesParameter prepareTargetTablesParameter;
     
     @Mock
-    private PipelineConfiguration pipelineConfig;
+    private RuleAlteredJobConfiguration jobConfig;
     
     @Mock
     private YamlPipelineDataSourceConfiguration sourceYamlPipelineDataSourceConfiguration;
@@ -76,13 +76,13 @@ public final class MySQLDataSourcePreparerTest {
         when(mockPipelineDataSourceManager.getDataSource(same(sourceScalingDataSourceConfig))).thenReturn(sourceDataSourceWrapper);
         when(mockPipelineDataSourceManager.getDataSource(same(targetScalingDataSourceConfig))).thenReturn(targetDataSourceWrapper);
         when(prepareTargetTablesParameter.getDataSourceManager()).thenReturn(mockPipelineDataSourceManager);
-        when(pipelineConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfiguration);
-        when(pipelineConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
-        when(pipelineConfig.getSource().getParameter()).thenReturn("source");
-        when(pipelineConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfiguration);
-        when(pipelineConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
-        when(pipelineConfig.getTarget().getParameter()).thenReturn("target");
-        when(prepareTargetTablesParameter.getPipelineConfiguration()).thenReturn(pipelineConfig);
+        when(jobConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfiguration);
+        when(jobConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
+        when(jobConfig.getSource().getParameter()).thenReturn("source");
+        when(jobConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfiguration);
+        when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
+        when(jobConfig.getTarget().getParameter()).thenReturn("target");
+        when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
         when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new JobDataNodeLine(Collections.emptyList()));
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index c0690d3170d..0dcdf686db0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -60,7 +60,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
             throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
         }
         Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
-        try (Connection targetConnection = getTargetCachedDataSource(parameter.getPipelineConfiguration(), parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getJobConfig(), parameter.getDataSourceManager()).getConnection()) {
             for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
                 for (String each : entry.getValue()) {
                     executeTargetTableSQL(targetConnection, each);
@@ -75,7 +75,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
     private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException {
         Collection<ActualTableDefinition> result = new ArrayList<>();
         ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(
-                parameter.getPipelineConfiguration().getSource().getType(), parameter.getPipelineConfiguration().getSource().getParameter());
+                parameter.getJobConfig().getSource().getType(), parameter.getJobConfig().getSource().getParameter());
         // TODO reuse PipelineDataSourceManager
         try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
             for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index 8c55316f395..638cb4e3c00 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -83,10 +83,8 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
             throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
         }
         Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
-        
         try (
-                Connection targetConnection = getTargetCachedDataSource(parameter.getPipelineConfiguration(),
-                        parameter.getDataSourceManager()).getConnection()) {
+                Connection targetConnection = getTargetCachedDataSource(parameter.getJobConfig(), parameter.getDataSourceManager()).getConnection()) {
             for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
                 for (String each : entry.getValue()) {
                     executeTargetTableSQL(targetConnection, each);
@@ -124,8 +122,8 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
     private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException {
         Collection<ActualTableDefinition> result = new LinkedList<>();
         ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig =
-                (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(parameter.getPipelineConfiguration().getSource().getType(),
-                        parameter.getPipelineConfiguration().getSource().getParameter());
+                (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(parameter.getJobConfig().getSource().getType(),
+                        parameter.getJobConfig().getSource().getParameter());
         try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
             for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
                 DataNode dataNode = each.getDataNodes().get(0);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
index 0fa33438045..30a8a7ff919 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
@@ -20,8 +20,7 @@ package org.apache.shardingsphere.integration.data.pipeline.env;
 import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -82,11 +81,9 @@ public final class ITEnvironmentContext {
     }
     
     private static String createScalingConfiguration(final Map<String, YamlTableRuleConfiguration> tableRules) {
-        PipelineConfiguration pipelineConfig = new PipelineConfiguration();
-        pipelineConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
-        pipelineConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
-        JobConfiguration jobConfig = new JobConfiguration();
-        jobConfig.setPipelineConfig(pipelineConfig);
+        RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+        jobConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
+        jobConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
         return new Gson().toJson(jobConfig);
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 997b352a08f..e2e14c205c9 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -24,8 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -130,12 +129,12 @@ public final class RuleAlteredJobAPIImplTest {
     public void assertDataConsistencyCheck() {
         Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        JobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
-        if (null == jobConfig.getPipelineConfig()) {
-            log.error("pipelineConfig is null, jobConfig={}", YamlEngine.marshal(jobConfig));
+        RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
+        if (null == jobConfig.getSource()) {
+            log.error("source is null, jobConfig={}", YamlEngine.marshal(jobConfig));
         }
-        initTableData(jobConfig.getPipelineConfig());
-        String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+        initTableData(jobConfig);
+        String databaseName = jobConfig.getDatabaseName();
         ruleAlteredJobAPI.stopClusterWriteDB(databaseName, jobId.get());
         Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get());
         ruleAlteredJobAPI.restoreClusterWriteDB(databaseName, jobId.get());
@@ -146,9 +145,9 @@ public final class RuleAlteredJobAPIImplTest {
     public void assertDataConsistencyCheckWithAlgorithm() {
         Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        JobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
-        initTableData(jobConfig.getPipelineConfig());
-        String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+        RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
+        initTableData(jobConfig);
+        String databaseName = jobConfig.getDatabaseName();
         ruleAlteredJobAPI.stopClusterWriteDB(databaseName, jobId.get());
         Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE");
         ruleAlteredJobAPI.restoreClusterWriteDB(databaseName, jobId.get());
@@ -197,11 +196,11 @@ public final class RuleAlteredJobAPIImplTest {
     
     @Test(expected = PipelineVerifyFailedException.class)
     public void assertSwitchClusterConfigurationAlreadyFinished() {
-        final JobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
-        Optional<String> jobId = ruleAlteredJobAPI.start(jobConfiguration);
+        final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+        Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         final GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfiguration);
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
         jobContext.setInitProgress(new JobProgress());
         repositoryAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
@@ -211,12 +210,12 @@ public final class RuleAlteredJobAPIImplTest {
     
     @Test
     public void assertSwitchClusterConfigurationSucceed() {
-        final JobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
-        jobConfiguration.getHandleConfig().setJobShardingItem(0);
-        Optional<String> jobId = ruleAlteredJobAPI.start(jobConfiguration);
+        final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+        jobConfig.setJobShardingItem(0);
+        Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfiguration);
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
         jobContext.setInitProgress(new JobProgress());
         repositoryAPI.persistJobProgress(jobContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
@@ -232,8 +231,8 @@ public final class RuleAlteredJobAPIImplTest {
     public void assertResetTargetTable() {
         Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        JobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
-        initTableData(jobConfig.getPipelineConfig());
+        RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
+        initTableData(jobConfig);
         ruleAlteredJobAPI.stop(jobId.get());
         ruleAlteredJobAPI.reset(jobId.get());
         Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobConfig);
@@ -241,10 +240,10 @@ public final class RuleAlteredJobAPIImplTest {
     }
     
     @SneakyThrows(SQLException.class)
-    private void initTableData(final PipelineConfiguration pipelineConfig) {
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
+    private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
+        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         initTableData(PipelineDataSourceCreatorFactory.getInstance(sourceDataSourceConfig.getType()).createPipelineDataSource(sourceDataSourceConfig.getDataSourceConfiguration()));
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
+        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
         initTableData(PipelineDataSourceCreatorFactory.getInstance(targetDataSourceConfig.getType()).createPipelineDataSource(targetDataSourceConfig.getDataSourceConfiguration()));
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 9ba77b75ddc..2522239fc4f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCalculateAlgorithm;
@@ -52,7 +52,7 @@ public final class DataConsistencyCheckerTest {
         assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
     }
     
-    private JobConfiguration createJobConfiguration() throws SQLException {
+    private RuleAlteredJobConfiguration createJobConfiguration() throws SQLException {
         RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
         initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
         initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
index d63686a922e..720de47b899 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue;
 
 public final class PipelineDataSourceManagerTest {
     
-    private JobConfiguration jobConfig;
+    private RuleAlteredJobConfiguration jobConfig;
     
     @Before
     public void setUp() {
@@ -47,7 +47,7 @@ public final class PipelineDataSourceManagerTest {
     public void assertGetDataSource() {
         PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
         DataSource actual = dataSourceManager.getDataSource(
-                PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter()));
+                PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
         assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
     }
     
@@ -55,9 +55,9 @@ public final class PipelineDataSourceManagerTest {
     public void assertClose() throws NoSuchFieldException, IllegalAccessException {
         try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
             dataSourceManager.getDataSource(
-                    PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter()));
+                    PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
             dataSourceManager.getDataSource(
-                    PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter()));
+                    PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter()));
             Map<?, ?> cachedDataSources = ReflectionUtil.getFieldValue(dataSourceManager, "cachedDataSources", Map.class);
             assertNotNull(cachedDataSources);
             assertThat(cachedDataSources.size(), is(2));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index bc4a0bced79..484c998ec10 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -20,9 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import com.google.common.collect.ImmutableMap;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -46,17 +44,19 @@ public final class JobConfigurationBuilder {
      *
      * @return created job configuration
      */
-    public static JobConfiguration createJobConfiguration() {
-        JobConfiguration result = new JobConfiguration();
-        result.setWorkflowConfig(new WorkflowConfiguration("logic_db", ImmutableMap.of(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")), 0, 1));
-        PipelineConfiguration pipelineConfig = new PipelineConfiguration();
-        pipelineConfig.setSource(createYamlPipelineDataSourceConfiguration(
+    public static RuleAlteredJobConfiguration createJobConfiguration() {
+        RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+        result.setDatabaseName("logic_db");
+        result.setAlteredRuleYamlClassNameTablesMap(ImmutableMap.of(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")));
+        result.setActiveVersion(0);
+        result.setNewVersion(1);
+        // TODO add autoTables in config file
+        result.setSource(createYamlPipelineDataSourceConfiguration(
                 new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
-        pipelineConfig.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
-        result.setPipelineConfig(pipelineConfig);
+        result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
         result.buildHandleConfig();
         int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
-        result.getHandleConfig().setJobId(generateJobId(activeVersion, "logic_db"));
+        result.setJobId(generateJobId(activeVersion, "logic_db"));
         return result;
     }
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
index 5e2b7845a8f..1600b0127f1 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -50,7 +50,7 @@ public final class RuleAlteredJobTest {
     @SuppressWarnings("unchecked")
     @SneakyThrows(ReflectiveOperationException.class)
     public void assertExecute() {
-        JobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+        RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
         initTableData(jobConfig);
         ShardingContext shardingContext = new ShardingContext("1", null, 2, YamlEngine.marshal(jobConfig), 0, null);
         new RuleAlteredJob().execute(shardingContext);
@@ -60,8 +60,8 @@ public final class RuleAlteredJobTest {
     }
     
     @SneakyThrows(SQLException.class)
-    private void initTableData(final JobConfiguration jobConfig) {
-        YamlPipelineDataSourceConfiguration source = jobConfig.getPipelineConfig().getSource();
+    private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
+        YamlPipelineDataSourceConfiguration source = jobConfig.getSource();
         try (
                 PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
                 Connection connection = dataSource.getConnection();
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 5e8ad07de13..8c1736ec663 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.FileUtils;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -41,6 +39,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
+import java.util.Collections;
 import java.util.Optional;
 
 import static org.junit.Assert.assertFalse;
@@ -56,8 +55,8 @@ public final class RuleAlteredJobWorkerTest {
     
     @Test(expected = PipelineJobCreationException.class)
     public void assertCreateRuleAlteredContextNoAlteredRule() {
-        JobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
-        jobConfig.setWorkflowConfig(new WorkflowConfiguration("logic_db", ImmutableMap.of(), 0, 1));
+        RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+        jobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
         RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
     }
     
@@ -90,8 +89,8 @@ public final class RuleAlteredJobWorkerTest {
     // TODO improve assertHasUncompletedJob, refactor hasUncompletedJobOfSameDatabaseName for easier unit test
     // @Test
     public void assertHasUncompletedJob() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException {
-        final JobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfiguration);
+        final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
         jobContext.setStatus(JobStatus.PREPARING);
         GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
         repositoryAPI.persistJobProgress(jobContext);
@@ -99,7 +98,7 @@ public final class RuleAlteredJobWorkerTest {
         assertNotNull(jobConfigUrl);
         repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()), FileUtils.readFileToString(new File(jobConfigUrl.getFile())));
         Object result = ReflectionUtil.invokeMethod(new RuleAlteredJobWorker(), "hasUncompletedJobOfSameDatabaseName", new Class[]{String.class},
-                new String[]{jobConfiguration.getWorkflowConfig().getDatabaseName()});
+                new String[]{jobConfig.getDatabaseName()});
         assertFalse((Boolean) result);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index cd63cb74f82..5d60ea5c0f0 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -68,7 +68,7 @@ public final class InventoryTaskSplitterTest {
     
     @Test
     public void assertSplitInventoryDataWithEmptyTable() throws SQLException {
-        taskConfig.getHandleConfig().setShardingSize(10);
+        taskConfig.getJobConfig().setShardingSize(10);
         initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig());
         List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
         assertNotNull(actual);
@@ -79,7 +79,7 @@ public final class InventoryTaskSplitterTest {
     
     @Test
     public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
-        taskConfig.getHandleConfig().setShardingSize(10);
+        taskConfig.getJobConfig().setShardingSize(10);
         initIntPrimaryEnvironment(taskConfig.getDumperConfig());
         List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
         assertNotNull(actual);